# License for the specific language governing permissions and limitations
# under the License.
-"""Ceph Backup Service Implementation"""
+"""Ceph Backup Service Implementation.
-import os
-import time
+This driver supports backing up volumes of any type to a Ceph backend store. It
+is also capable of detecting whether the volume to be backed up is a Ceph RBD
+volume and if so, attempts to perform incremental/differential backups.
+
+Support is also included for the following in the case of source volume being a
+Ceph RBD volume:
+
+ * backing up within the same Ceph pool (not recommended)
+ * backing up between different Ceph pools
+ * backing up between different Ceph clusters
+
+At the time of writing, differential backup support in Ceph/librbd was quite
+new so this driver accounts for this by first attempting differential backup
+and falling back to full backup/copy if the former fails.
+
+If incremental backups are used, multiple backups of the same volume are stored
+as snapshots so that minimal space is consumed in the backup store and
+restoring the volume takes a far reduced amount of time compared to a full
+copy.
+
+Note that Cinder supports restoring to a new volume or the original volume the
+backup was taken from. For the latter case, a full copy is enforced since this
+was deemed the safest action to take. It is therefore recommended to always
+restore to a new volume (default).
+"""
import eventlet
-from oslo.config import cfg
+import os
+import re
+import time
from cinder.backup.driver import BackupDriver
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import units
-import cinder.volume.drivers.rbd as rbddriver
+from cinder import utils
+import cinder.volume.drivers as drivers
+from oslo.config import cfg
try:
import rados
class CephBackupDriver(BackupDriver):
- """Backup up Cinder volumes to Ceph Object Store"""
+ """Backup up Cinder volumes to Ceph Object Store.
- def __init__(self, context, db_driver=None):
+ This class enables backing up Cinder volumes to a Ceph object store.
+ Backups may be stored in their own pool or even cluster. Store location is
+ defined by the Ceph conf file and Service config options supplied.
+
+ If the source volume is itself an RBD volume, the backup will be performed
+ using incremental differential backups which *should* give a performance
+ gain.
+ """
+
+ def __init__(self, context, db_driver=None, execute=None):
super(CephBackupDriver, self).__init__(db_driver)
self.rbd = rbd
self.rados = rados
self.context = context
self.chunk_size = CONF.backup_ceph_chunk_size
- if self._supports_stripingv2():
+ self._execute = execute or utils.execute
+
+ if self._supports_stripingv2:
self.rbd_stripe_unit = CONF.backup_ceph_stripe_unit
self.rbd_stripe_count = CONF.backup_ceph_stripe_count
else:
- LOG.info("rbd striping not supported - ignoring conf settings "
- "for rbd striping")
+ LOG.info(_("rbd striping not supported - ignoring configuration "
+ "settings for rbd striping"))
self.rbd_stripe_count = 0
self.rbd_stripe_unit = 0
- self._ceph_user = str(CONF.backup_ceph_user)
- self._ceph_pool = str(CONF.backup_ceph_pool)
- self._ceph_conf = str(CONF.backup_ceph_conf)
+ self._ceph_backup_user = str(CONF.backup_ceph_user)
+ self._ceph_backup_pool = str(CONF.backup_ceph_pool)
+ self._ceph_backup_conf = str(CONF.backup_ceph_conf)
- def _supports_layering(self):
- """
- Determine whether copy-on-write is supported by our version of librbd
+ def _validate_string_args(self, *args):
+ """Ensure all args are non-None and non-empty."""
+ return all(args)
+
+ def _ceph_args(self, user, conf=None, pool=None):
+ """Create default ceph args for executing rbd commands.
+
+ If no --conf is provided, rbd will look in the default locations e.g.
+ /etc/ceph/ceph.conf
"""
+
+ # Make sure user arg is valid since rbd command may not fail if
+ # invalid/no user provided, resulting in unexpected behaviour.
+ if not self._validate_string_args(user):
+ raise exception.BackupInvalidCephArgs(_("invalid user '%s'") %
+ (user))
+
+ args = ['--id', user]
+ if conf:
+ args += ['--conf', conf]
+ if pool:
+ args += '--pool', pool
+
+ return args
+
+ @property
+ def _supports_layering(self):
+ """Determine if copy-on-write is supported by our version of librbd."""
return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
+ @property
def _supports_stripingv2(self):
- """
- Determine whether striping is supported by our version of librbd
- """
+ """Determine if striping is supported by our version of librbd."""
return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2')
def _get_rbd_support(self):
+ """Determine RBD features supported by our version of librbd."""
old_format = True
features = 0
- if self._supports_layering():
+ if self._supports_layering:
old_format = False
features |= self.rbd.RBD_FEATURE_LAYERING
- if self._supports_stripingv2():
+ if self._supports_stripingv2:
old_format = False
features |= self.rbd.RBD_FEATURE_STRIPINGV2
return (old_format, features)
def _connect_to_rados(self, pool=None):
- """Establish connection to the Ceph cluster"""
- client = self.rados.Rados(rados_id=self._ceph_user,
- conffile=self._ceph_conf)
+ """Establish connection to the backup Ceph cluster."""
+ client = self.rados.Rados(rados_id=self._ceph_backup_user,
+ conffile=self._ceph_backup_conf)
try:
client.connect()
- pool_to_open = str(pool or self._ceph_pool)
+ pool_to_open = str(pool or self._ceph_backup_pool)
ioctx = client.open_ioctx(pool_to_open)
return client, ioctx
except self.rados.Error:
raise
def _disconnect_from_rados(self, client, ioctx):
- """Terminate connection with the Ceph cluster"""
+ """Terminate connection with the backup Ceph cluster."""
# closing an ioctx cannot raise an exception
ioctx.close()
client.shutdown()
- def _get_backup_base_name(self, volume_id, backup_id):
- """Return name of base image used for backup."""
- # Ensure no unicode
- return str("volume-%s.backup.%s" % (volume_id, backup_id))
+ def _get_backup_base_name(self, volume_id, backup_id=None,
+ diff_format=False):
+ """Return name of base image used for backup.
- def _transfer_data(self, src, dest, dest_name, length, dest_is_rbd=False):
- """
- Transfer data between file and rbd. If destination is rbd, source is
- assumed to be file, otherwise source is assumed to be rbd.
+ Incremental backups use a new base name so we support old and new style
+ format.
"""
+ # Ensure no unicode
+ if diff_format:
+ return str("volume-%s.backup.base" % (volume_id))
+ else:
+ if backup_id is None:
+ msg = _("backup_id required")
+ raise exception.InvalidParameterValue(msg)
+ return str("volume-%s.backup.%s" % (volume_id, backup_id))
+
+ def _transfer_data(self, src, src_name, dest, dest_name, length):
+ """Transfer data between files (Python IO objects)."""
+ LOG.debug(_("transferring data between '%(src)s' and '%(dest)s'") %
+ {'src': src_name, 'dest': dest_name})
+
chunks = int(length / self.chunk_size)
- LOG.debug("transferring %s chunks of %s bytes to '%s'" %
- (chunks, self.chunk_size, dest_name))
+ LOG.debug(_("%(chunks)s chunks of %(bytes)s bytes to be transferred") %
+ {'chunks': chunks, 'bytes': self.chunk_size})
+
for chunk in xrange(0, chunks):
- offset = chunk * self.chunk_size
before = time.time()
-
- if dest_is_rbd:
- dest.write(src.read(self.chunk_size), offset)
- # note(dosaboy): librbd writes are synchronous so flush() will
- # have not effect. Also, flush only supported in more recent
- # versions of librbd.
- else:
- dest.write(src.read(offset, self.chunk_size))
- dest.flush()
-
+ data = src.read(self.chunk_size)
+ dest.write(data)
+ dest.flush()
delta = (time.time() - before)
rate = (self.chunk_size / delta) / 1024
- LOG.debug("transferred chunk %s of %s (%dK/s)" %
- (chunk, chunks, rate))
+ LOG.debug((_("transferred chunk %(chunk)s of %(chunks)s "
+ "(%(rate)dK/s)") %
+ {'chunk': chunk, 'chunks': chunks,
+ 'rate': rate}))
# yield to any other pending backups
eventlet.sleep(0)
rem = int(length % self.chunk_size)
if rem:
- LOG.debug("transferring remaining %s bytes" % (rem))
- offset = (length - rem)
- if dest_is_rbd:
- dest.write(src.read(rem), offset)
- # note(dosaboy): librbd writes are synchronous so flush() will
- # have not effect. Also, flush only supported in more recent
- # versions of librbd.
- else:
- dest.write(src.read(offset, rem))
- dest.flush()
-
+ LOG.debug(_("transferring remaining %s bytes") % (rem))
+ data = src.read(rem)
+ dest.write(data)
+ dest.flush()
# yield to any other pending backups
eventlet.sleep(0)
- def _backup_volume_from_file(self, backup_name, backup_size, volume_file):
- """Backup a volume from file stream"""
- LOG.debug("performing backup from file")
+ def _create_base_image(self, name, size, rados_client):
+ """Create a base backup image.
+ This will be the base image used for storing differential exports.
+ """
+ LOG.debug(_("creating base image '%s'") % (name))
old_format, features = self._get_rbd_support()
+ self.rbd.RBD().create(ioctx=rados_client.ioctx,
+ name=name,
+ size=size,
+ old_format=old_format,
+ features=features,
+ stripe_unit=self.rbd_stripe_unit,
+ stripe_count=self.rbd_stripe_count)
+
+ def _delete_backup_snapshots(self, rados_client, base_name, backup_id):
+ """Delete any snapshots associated with this backup.
+
+ A backup should have at most *one* associated snapshot.
+
+ This is required before attempting to delete the base image. The
+ snapshots on the original volume can be left as they will be purged
+ when the volume is deleted.
+ """
+ backup_snaps = None
+ base_rbd = self.rbd.Image(rados_client.ioctx, base_name)
+ try:
+ snap_name = self._get_backup_snap_name(base_rbd, base_name,
+ backup_id)
+ if snap_name:
+ LOG.debug(_("deleting backup snapshot='%s'") % (snap_name))
+ base_rbd.remove_snap(snap_name)
+ else:
+ LOG.debug(_("no backup snapshot to delete"))
- with rbddriver.RADOSClient(self, self._ceph_pool) as client:
+ # Now check whether any snapshots remain on the base image
+ backup_snaps = self.get_backup_snaps(base_rbd)
+ finally:
+ base_rbd.close()
+
+ if backup_snaps:
+ return len(backup_snaps)
+ else:
+ return 0
+
+ def _try_delete_base_image(self, backup_id, volume_id, base_name=None):
+ """Try to delete backup RBD image.
+
+ If the rbd image is a base image for incremental backups, it may have
+ snapshots. Delete the snapshot associated with backup_id and if the
+ image has no more snapshots, delete it. Otherwise return.
+
+ If no base name is provided try normal (full) format then diff format
+ image name.
+
+ If a base name is provided but does not exist, ImageNotFound will be
+ raised.
+
+ If the image is busy, a number of retries will be performed if
+ ImageBusy is received, after which the exception will be propagated to
+ the caller.
+ """
+ retries = 3
+ delay = 5
+ try_diff_format = False
+
+ if base_name is None:
+ try_diff_format = True
+
+ base_name = self._get_backup_base_name(volume_id, backup_id)
+ LOG.debug(_("trying diff format name format basename='%s'") %
+ (base_name))
+
+ with drivers.rbd.RADOSClient(self) as client:
+ rbd_exists, base_name = \
+ self._rbd_image_exists(base_name, volume_id, client,
+ try_diff_format=try_diff_format)
+ if not rbd_exists:
+ raise self.rbd.ImageNotFound(_("image %s not found") %
+ (base_name))
+
+ while retries >= 0:
+ # First delete associated snapshot (if exists)
+ rem = self._delete_backup_snapshots(client, base_name,
+ backup_id)
+ if rem:
+ msg = (_("base image still has %s snapshots so not "
+ "deleting base image") % (rem))
+ LOG.info(msg)
+ return
+
+ LOG.info(_("deleting base image='%s'") % (base_name))
+ # Delete base if no more snapshots
+ try:
+ self.rbd.RBD().remove(client.ioctx, base_name)
+ except self.rbd.ImageBusy as exc:
+ # Allow a retry if the image is busy
+ if retries > 0:
+ LOG.info((_("image busy, retrying %(retries)s "
+ "more time(s) in %(delay)ss") %
+ {'retries': retries, 'delay': delay}))
+ eventlet.sleep(delay)
+ else:
+ LOG.error(_("max retries reached - raising error"))
+ raise exc
+ else:
+ LOG.debug(_("base backup image='%s' deleted)") %
+ (base_name))
+ retries = 0
+ finally:
+ retries -= 1
+
+ def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
+ src_user, src_conf, dest_user, dest_conf,
+ src_snap=None, from_snap=None):
+ """Backup only extents changed between two points.
+
+ If no snapshot is provided, the diff extents will be all those changed
+ since the rbd volume/base was created, otherwise it will be those
+ changed since the snapshot was created.
+ """
+ LOG.debug(_("performing differential transfer from '%(src)s' to "
+ "'%(dest)s'") %
+ {'src': src_name, 'dest': dest_name})
+
+ # NOTE(dosaboy): Need to be tolerant of clusters/clients that do
+ # not support these operations since at the time of writing they
+ # were very new.
+
+ src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
+ dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)
+
+ try:
+ cmd = ['rbd', 'export-diff'] + src_ceph_args
+ if from_snap is not None:
+ cmd += ['--from-snap', from_snap]
+ if src_snap:
+ path = str("%s/%s@%s" % (src_pool, src_name, src_snap))
+ else:
+ path = str("%s/%s" % (src_pool, src_name))
+ cmd += [path, '-']
+ out, err = self._execute(*cmd)
+ except (exception.ProcessExecutionError, exception.Error) as exc:
+ LOG.info(_("rbd export-diff failed - %s") % (str(exc)))
+ raise exception.BackupRBDOperationFailed("rbd export-diff failed")
+
+ try:
+ cmd = ['rbd', 'import-diff'] + dest_ceph_args
+ cmd += ['-', str("%s/%s" % (dest_pool, dest_name))]
+ out, err = self._execute(*cmd, process_input=out)
+ except (exception.ProcessExecutionError, exception.Error) as exc:
+ LOG.info(_("rbd import-diff failed - %s") % (str(exc)))
+ raise exception.BackupRBDOperationFailed("rbd import-diff failed")
+
+ def _rbd_image_exists(self, name, volume_id, client,
+ try_diff_format=False):
+ """Return tuple (exists, name)."""
+ rbds = self.rbd.RBD().list(client.ioctx)
+ if name not in rbds:
+ msg = _("image '%s' not found - trying diff format name") % (name)
+ LOG.debug(msg)
+ if try_diff_format:
+ name = self._get_backup_base_name(volume_id, diff_format=True)
+ if name not in rbds:
+ msg = _("diff format image '%s' not found") % (name)
+ LOG.debug(msg)
+ return False, name
+ else:
+ return False, name
+
+ return True, name
+
+ def _snap_exists(self, base_name, snap_name, client):
+ """Return True if snapshot exists in base image."""
+ base_rbd = self.rbd.Image(client.ioctx, base_name)
+ try:
+ snaps = base_rbd.list_snaps()
+ finally:
+ base_rbd.close()
+
+ if snaps is None:
+ return False
+
+ for snap in snaps:
+ if snap['name'] == snap_name:
+ return True
+
+ return False
+
+ def _backup_rbd(self, backup_id, volume_id, volume_file, volume_name,
+ length):
+ """Create a incremental backup from an RBD image."""
+ rbd_user = volume_file.rbd_user
+ rbd_pool = volume_file.rbd_pool
+ rbd_conf = volume_file.rbd_conf
+ source_rbd_image = volume_file.rbd_image
+
+ # Identify our --from-snap point (if one exists)
+ from_snap = self._get_most_recent_snap(source_rbd_image)
+ LOG.debug(_("using --from-snap '%s'") % from_snap)
+
+ backup_name = self._get_backup_base_name(volume_id, diff_format=True)
+ image_created = False
+ force_full_backup = False
+ with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+ # If from_snap does not exist in the destination, this implies a
+ # previous backup has failed. In this case we will force a full
+ # backup.
+ #
+ # TODO(dosaboy): find a way to repair the broken backup
+ #
+ if backup_name not in self.rbd.RBD().list(ioctx=client.ioctx):
+ # If a from_snap is defined then we cannot proceed (see above)
+ if from_snap is not None:
+ force_full_backup = True
+
+ # Create new base image
+ self._create_base_image(backup_name, length, client)
+ image_created = True
+ else:
+ # If a from_snap is defined but does not exist in the back base
+ # then we cannot proceed (see above)
+ if not self._snap_exists(backup_name, from_snap, client):
+ force_full_backup = True
+
+ if force_full_backup:
+ errmsg = (_("snap='%(snap)s' does not exist in base "
+ "image='%(base)s' - aborting incremental backup") %
+ {'snap': from_snap, 'base': backup_name})
+ LOG.info(errmsg)
+ # Raise this exception so that caller can try another
+ # approach
+ raise exception.BackupRBDOperationFailed(errmsg)
+
+ # Snapshot source volume so that we have a new point-in-time
+ new_snap = self._get_new_snap_name(backup_id)
+ LOG.debug(_("creating backup snapshot='%s'") % (new_snap))
+ source_rbd_image.create_snap(new_snap)
+
+ # Attempt differential backup. If this fails, perhaps because librbd
+ # or Ceph cluster version does not support it, do a full backup
+ # instead.
+ #
+ # TODO(dosaboy): find a way to determine if the operation is supported
+ # rather than brute force approach.
+ try:
+ before = time.time()
+ self._rbd_diff_transfer(volume_name, rbd_pool, backup_name,
+ self._ceph_backup_pool,
+ src_user=rbd_user,
+ src_conf=rbd_conf,
+ dest_user=self._ceph_backup_user,
+ dest_conf=self._ceph_backup_conf,
+ src_snap=new_snap,
+ from_snap=from_snap)
+
+ LOG.debug(_("differential backup transfer completed in %.4fs") %
+ (time.time() - before))
+
+ # We don't need the previous snapshot (if there was one) anymore so
+ # delete it.
+ if from_snap:
+ source_rbd_image.remove_snap(from_snap)
+
+ except exception.BackupRBDOperationFailed:
+ LOG.debug(_("differential backup transfer failed"))
+
+ # Clean up if image was created as part of this operation
+ if image_created:
+ self._try_delete_base_image(backup_id, volume_id,
+ base_name=backup_name)
+
+ # Delete snapshot
+ LOG.debug(_("deleting backup snapshot='%s'") % (new_snap))
+ source_rbd_image.remove_snap(new_snap)
+
+ # Re-raise the exception so that caller can try another approach
+ raise
+
+ def _file_is_rbd(self, volume_file):
+ """Returns True if the volume_file is actually an RBD image."""
+ return hasattr(volume_file, 'rbd_image')
+
+ def _full_backup(self, backup_id, volume_id, src_volume, src_name, length):
+ """Perform a full backup of src volume.
+
+ First creates a base backup image in our backup location then performs
+ an chunked copy of all data from source volume to a new backup rbd
+ image.
+ """
+ backup_name = self._get_backup_base_name(volume_id, backup_id)
+
+ with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+ # First create base backup image
+ old_format, features = self._get_rbd_support()
+ LOG.debug(_("creating base image='%s'") % (backup_name))
self.rbd.RBD().create(ioctx=client.ioctx,
name=backup_name,
- size=backup_size,
+ size=length,
old_format=old_format,
features=features,
stripe_unit=self.rbd_stripe_unit,
stripe_count=self.rbd_stripe_count)
+ LOG.debug(_("copying data"))
dest_rbd = self.rbd.Image(client.ioctx, backup_name)
try:
- self._transfer_data(volume_file, dest_rbd, backup_name,
- backup_size, dest_is_rbd=True)
+ rbd_meta = drivers.rbd.RBDImageMetadata(dest_rbd,
+ self._ceph_backup_pool,
+ self._ceph_backup_user,
+ self._ceph_backup_conf)
+ rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta)
+ self._transfer_data(src_volume, src_name, rbd_fd, backup_name,
+ length)
finally:
dest_rbd.close()
+ @staticmethod
+ def backup_snapshot_name_pattern():
+ """Returns the pattern used to match backup snapshots.
+
+ It is essential that snapshots created for purposes other than backups
+ do not have this name format.
+ """
+ return r"^backup\.([a-z0-9\-]+?)\.snap\.(.+)$"
+
+ @classmethod
+ def get_backup_snaps(cls, rbd_image, sort=False):
+ """Get all backup snapshots for the given rbd image.
+
+ NOTE: this call is made public since these snapshots must be deleted
+ before the base volume can be deleted.
+ """
+ snaps = rbd_image.list_snaps()
+
+ backup_snaps = []
+ for snap in snaps:
+ search_key = cls.backup_snapshot_name_pattern()
+ result = re.search(search_key, snap['name'])
+ if result:
+ backup_snaps.append({'name': result.group(0),
+ 'backup_id': result.group(1),
+ 'timestamp': result.group(2)})
+
+ if sort:
+ # Sort into ascending order of timestamp
+ backup_snaps.sort(key=lambda x: x['timestamp'], reverse=True)
+
+ return backup_snaps
+
+ def _get_new_snap_name(self, backup_id):
+ return str("backup.%s.snap.%s" % (backup_id, time.time()))
+
+ def _get_backup_snap_name(self, rbd_image, name, backup_id):
+ """Return the name of the snapshot associated with backup_id.
+
+ The rbd image provided must be the base image used for an incremental
+ backup.
+
+ A back is only allowed to have one associated snapshot. If more than
+ one is found, exception.BackupOperationError is raised.
+ """
+ snaps = self.get_backup_snaps(rbd_image)
+
+ LOG.debug(_("looking for snapshot of backup base '%s'") % (name))
+
+ if not snaps:
+ LOG.debug(_("backup base '%s' has no snapshots") % (name))
+ return None
+
+ snaps = [snap['name'] for snap in snaps
+ if snap['backup_id'] == backup_id]
+
+ if not snaps:
+ LOG.debug(_("backup '%s' has no snapshot") % (backup_id))
+ return None
+
+ if len(snaps) > 1:
+ msg = (_("backup should only have one snapshot but instead has %s")
+ % (len(snaps)))
+ LOG.error(msg)
+ raise exception.BackupOperationError(msg)
+
+ LOG.debug(_("found snapshot '%s'") % (snaps[0]))
+ return snaps[0]
+
+ def _get_most_recent_snap(self, rbd_image):
+ """Get the most recent backup snapshot of the provided image.
+
+ Returns name of most recent backup snapshot or None if there are no
+ backup snapshot.
+ """
+ backup_snaps = self.get_backup_snaps(rbd_image, sort=True)
+ if not backup_snaps:
+ return None
+
+ return backup_snaps[0]['name']
+
+ def _get_volume_size_gb(self, volume):
+ """Return the size in gigabytes of the given volume.
+
+ Raises exception.InvalidParameterValue if voluem size is 0.
+ """
+ if int(volume['size']) == 0:
+ raise exception.InvalidParameterValue("need non-zero volume size")
+
+ return int(volume['size']) * units.GiB
+
def backup(self, backup, volume_file):
- """Backup the given volume to Ceph object store"""
+ """Backup the given volume to Ceph object store.
+
+ If the source volume is an RBD we will attempt to do an
+ incremental/differential backup, otherwise a full copy is performed.
+ If this fails we will attempt to fall back to full copy.
+ """
backup_id = backup['id']
volume = self.db.volume_get(self.context, backup['volume_id'])
- backup_name = self._get_backup_base_name(volume['id'], backup_id)
+ volume_id = volume['id']
+ volume_name = volume['name']
- LOG.debug("Starting backup of volume='%s' to rbd='%s'" %
- (volume['name'], backup_name))
+ LOG.debug(_("Starting backup of volume='%s'") % volume_name)
- if int(volume['size']) == 0:
- raise exception.InvalidParameterValue("need non-zero volume size")
- else:
- backup_size = int(volume['size']) * units.GiB
+ # Ensure we are at the beginning of the volume
+ volume_file.seek(0)
+ length = self._get_volume_size_gb(volume)
- if volume_file:
- self._backup_volume_from_file(backup_name, backup_size,
- volume_file)
+ do_full_backup = False
+ if self._file_is_rbd(volume_file):
+ # If volume an RBD, attempt incremental backup.
+ try:
+ self._backup_rbd(backup_id, volume_id, volume_file,
+ volume_name, length)
+ except exception.BackupRBDOperationFailed:
+ LOG.debug(_("forcing full backup"))
+ do_full_backup = True
else:
- errmsg = ("No volume_file was provided so I cannot do requested "
- "backup (id=%s)" % (backup_id))
- raise exception.BackupVolumeInvalidType(errmsg)
+ do_full_backup = True
- self.db.backup_update(self.context, backup['id'],
- {'container': self._ceph_pool})
+ if do_full_backup:
+ self._full_backup(backup_id, volume_id, volume_file,
+ volume_name, length)
- LOG.debug(_("backup '%s' finished.") % (backup_id))
+ self.db.backup_update(self.context, backup_id,
+ {'container': self._ceph_backup_pool})
- def restore(self, backup, volume_id, volume_file):
- """Restore the given volume backup from Ceph object store"""
- volume = self.db.volume_get(self.context, volume_id)
- backup_name = self._get_backup_base_name(backup['volume_id'],
- backup['id'])
+ LOG.debug(_("backup '%s' finished.") % (backup_id))
- LOG.debug('starting backup restore from Ceph backup=%s '
- 'to volume=%s' % (backup['id'], volume['name']))
+ def _full_restore(self, backup_id, volume_id, dest_file, dest_name,
+ length, src_snap=None):
+ """Restore the given volume file from backup RBD.
- # Ensure we are at the beginning of the volume
- volume_file.seek(0)
+ This will result in all extents being copied from source to destination
+ """
+ with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
- backup_size = int(volume['size']) * units.GiB
+ if src_snap:
+ # If a source snapshot is provided we assume the base is diff
+ # format.
+ backup_name = self._get_backup_base_name(volume_id,
+ diff_format=True)
+ else:
+ backup_name = self._get_backup_base_name(volume_id, backup_id)
- with rbddriver.RADOSClient(self, self._ceph_pool) as client:
- src_rbd = self.rbd.Image(client.ioctx, backup_name)
+ # Retrieve backup volume
+ src_rbd = self.rbd.Image(client.ioctx, backup_name,
+ snapshot=src_snap)
try:
- self._transfer_data(src_rbd, volume_file, volume['name'],
- backup_size)
+ rbd_meta = drivers.rbd.RBDImageMetadata(src_rbd,
+ self._ceph_backup_pool,
+ self._ceph_backup_user,
+ self._ceph_backup_conf)
+ rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta)
+ self._transfer_data(rbd_fd, backup_name, dest_file, dest_name,
+ length)
finally:
src_rbd.close()
- # Be tolerant to IO implementations that do not support fileno()
+ def _restore_rbd(self, base_name, volume_file, volume_name, restore_point):
+ """Restore RBD volume from RBD image."""
+ rbd_user = volume_file.rbd_user
+ rbd_pool = volume_file.rbd_pool
+ rbd_conf = volume_file.rbd_conf
+
+ LOG.debug(_("trying incremental restore from base='%(base)s' "
+ "snap='%(snap)s'") %
+ {'base': base_name, 'snap': restore_point})
+ before = time.time()
try:
- fileno = volume_file.fileno()
- except IOError:
- LOG.info("volume_file does not support fileno() so skipping "
- "fsync()")
+ self._rbd_diff_transfer(base_name, self._ceph_backup_pool,
+ volume_name, rbd_pool,
+ src_user=self._ceph_backup_user,
+ src_conf=self._ceph_backup_conf,
+ dest_user=rbd_user, dest_conf=rbd_conf,
+ src_snap=restore_point)
+ except exception.BackupRBDOperationFailed:
+ LOG.exception(_("differential restore failed, trying full "
+ "restore"))
+ raise
+
+ LOG.debug(_("restore transfer completed in %.4fs") %
+ (time.time() - before))
+
+ def _num_backup_snaps(self, backup_base_name):
+ """Return the number of snapshots that exist on the base image."""
+ with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+ base_rbd = self.rbd.Image(client.ioctx, backup_base_name)
+ try:
+ snaps = self.get_backup_snaps(base_rbd)
+ finally:
+ base_rbd.close()
+
+ if snaps:
+ return len(snaps)
else:
- os.fsync(fileno)
+ return 0
- LOG.debug('restore %s to %s finished.' % (backup['id'], volume_id))
+ def _get_restore_point(self, base_name, backup_id):
+ """Get restore point snapshot name for incremental backup.
- def delete(self, backup):
- """Delete the given backup from Ceph object store"""
+ If the backup was not incremental None is returned.
+ """
+ with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+ base_rbd = self.rbd.Image(client.ioctx, base_name)
+ try:
+ restore_point = self._get_backup_snap_name(base_rbd, base_name,
+ backup_id)
+ finally:
+ base_rbd.close()
+
+ return restore_point
+
+ def _rbd_has_extents(self, rbd_volume):
+ """Check whether the given rbd volume has extents.
+
+ Return True if has extents, otherwise False.
+ """
+ extents = []
+
+ def iter_cb(offset, length, exists):
+ if exists:
+ extents.append(length)
+
+ rbd_volume.diff_iterate(0, rbd_volume.size(), None, iter_cb)
+
+ if extents:
+ LOG.debug("rbd has %s extents" % (sum(extents)))
+ return True
+
+ return False
+
+ def _diff_restore_allowed(self, base_name, backup, volume, volume_file,
+ rados_client):
+ """Determine whether a differential restore is possible/allowed.
+
+ In order for a differential restore to be performed we need:
+ * destination volume must be RBD
+ * destination volume must have zero extents
+ * backup base image must exist
+ * backup must have a restore point
+
+ Returns True if differential restore is allowed, False otherwise.
+ """
+ not_allowed = (False, None)
+
+ # If the volume we are restoring to is the volume the backup was made
+ # from, force a full restore since a diff will not work in this case.
+ if volume['id'] == backup['volume_id']:
+ LOG.debug("dest volume is original volume - forcing full copy")
+ return not_allowed
+
+ if self._file_is_rbd(volume_file):
+ rbd_exists, base_name = self._rbd_image_exists(base_name,
+ backup['volume_id'],
+ rados_client)
+
+ if not rbd_exists:
+ return not_allowed
+
+ # Get the restore point. If no restore point is found, we assume
+ # that the backup was not performed using diff/incremental methods
+ # so we enforce full copy.
+ restore_point = self._get_restore_point(base_name, backup['id'])
+ if restore_point:
+ # If the destination volume has extents we cannot allow a diff
+ # restore.
+ if self._rbd_has_extents(volume_file.rbd_image):
+ # We return the restore point so that a full copy is done
+ # from snapshot.
+ LOG.debug("destination has extents - forcing full copy")
+ return False, restore_point
+
+ return True, restore_point
+ else:
+ LOG.info(_("no restore point found for backup='%s', forcing "
+ "full copy") % (backup['id']))
+
+ return not_allowed
+
+ def _try_restore(self, backup, volume, volume_file):
+ """Attempt to restore volume from backup."""
+ volume_name = volume['name']
backup_id = backup['id']
- backup_name = self._get_backup_base_name(backup['volume_id'],
- backup_id)
+ backup_volume_id = backup['volume_id']
+ length = int(volume['size']) * units.GiB
- LOG.debug('delete started for backup=%s', backup['id'])
+ base_name = self._get_backup_base_name(backup['volume_id'],
+ diff_format=True)
+
+ with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+ diff_restore, restore_point = \
+ self._diff_restore_allowed(base_name, backup, volume,
+ volume_file, client)
+
+ if diff_restore:
+ try:
+ do_full_restore = False
+ self._restore_rbd(base_name, volume_file, volume_name,
+ restore_point)
+ except exception.BackupRBDOperationFailed:
+ LOG.debug(_("forcing full restore"))
+ do_full_restore = True
+ else:
+ do_full_restore = True
+
+ if do_full_restore:
+ # Otherwise full copy
+ self._full_restore(backup_id, backup_volume_id, volume_file,
+ volume_name, length, src_snap=restore_point)
+
+ def restore(self, backup, volume_id, volume_file):
+ """Restore the given volume backup from Ceph object store."""
+ target_volume = self.db.volume_get(self.context, volume_id)
+ LOG.debug(_('starting restore from Ceph backup=%(src)s to '
+ 'volume=%(dest)s') %
+ {'src': backup['id'], 'dest': target_volume['name']})
+
+ # Ensure we are at the beginning of the volume
+ volume_file.seek(0)
+
+ try:
+ self._try_restore(backup, target_volume, volume_file)
+
+ # Be tolerant to IO implementations that do not support fileno()
+ try:
+ fileno = volume_file.fileno()
+ except IOError:
+ LOG.info(_("volume_file does not support fileno() so skipping "
+ "fsync()"))
+ else:
+ os.fsync(fileno)
+
+ LOG.debug(_('restore finished.'))
+ except exception.BackupOperationError as e:
+ LOG.error(_('restore finished with error - %s') % (e))
+ raise
+
+ def delete(self, backup):
+ """Delete the given backup from Ceph object store."""
+ backup_id = backup['id']
+ LOG.debug(_('delete started for backup=%s') % backup['id'])
try:
- with rbddriver.RADOSClient(self) as client:
- self.rbd.RBD().remove(client.ioctx, backup_name)
+ self._try_delete_base_image(backup['id'], backup['volume_id'])
except self.rbd.ImageNotFound:
- LOG.warning("rbd image '%s' not found but continuing anyway so "
- "that db entry can be removed" % (backup_name))
+ msg = _("rbd image not found but continuing anyway so "
+ "that db entry can be removed")
+ LOG.warning(msg)
+ LOG.info(_("delete '%s' finished with warning") % (backup_id))
LOG.debug(_("delete '%s' finished") % (backup_id))
# under the License.
""" Tests for Ceph backup service """
+import eventlet
import hashlib
import os
import tempfile
+import time
import uuid
-from cinder.backup.drivers.ceph import CephBackupDriver
-from cinder.tests.backup.fake_rados import mock_rados
-from cinder.tests.backup.fake_rados import mock_rbd
-
from cinder.backup.drivers import ceph
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
+from cinder.tests.backup.fake_rados import mock_rados
+from cinder.tests.backup.fake_rados import mock_rbd
+from cinder.volume.drivers import rbd as rbddriver
LOG = logging.getLogger(__name__)
backup = {'id': backupid, 'size': size, 'volume_id': volid}
return db.backup_create(self.ctxt, backup)['id']
+ def fake_execute_w_exception(*args, **kwargs):
+ raise exception.ProcessExecutionError()
+
+ def time_inc(self):
+ self.counter += 1
+ return self.counter
+
+ def _get_wrapped_rbd_io(self, rbd_image):
+ rbd_meta = rbddriver.RBDImageMetadata(rbd_image, 'pool_foo',
+ 'user_foo', 'conf_foo')
+ return rbddriver.RBDImageIOWrapper(rbd_meta)
+
def setUp(self):
super(BackupCephTestCase, self).setUp()
self.ctxt = context.get_admin_context()
- self.vol_id = str(uuid.uuid4())
+ self.volume_id = str(uuid.uuid4())
self.backup_id = str(uuid.uuid4())
# Setup librbd stubs
self.stubs.Set(ceph, 'rados', mock_rados)
self.stubs.Set(ceph, 'rbd', mock_rbd)
- self._create_backup_db_entry(self.backup_id, self.vol_id, 1)
+ self._create_backup_db_entry(self.backup_id, self.volume_id, 1)
self.chunk_size = 1024
self.num_chunks = 128
self.volume_file.seek(0)
- def test_get_rbd_support(self):
- service = CephBackupDriver(self.ctxt)
+ # Always trigger an exception if a command is executed since it should
+ # always be dealt with gracefully. At time of writing on rbd
+ # export/import-diff is executed and if they fail we expect to find
+ # alternative means of backing up.
+ fake_exec = self.fake_execute_w_exception
+ self.service = ceph.CephBackupDriver(self.ctxt, execute=fake_exec)
+
+ # Ensure that time.time() always returns more than the last time it was
+ # called to avoid div by zero errors.
+ self.counter = float(0)
+ self.stubs.Set(time, 'time', self.time_inc)
+ self.stubs.Set(eventlet, 'sleep', lambda *args: None)
- self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_LAYERING'))
- self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_STRIPINGV2'))
+ def test_get_rbd_support(self):
+ self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING'))
+ self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2'))
- oldformat, features = service._get_rbd_support()
+ oldformat, features = self.service._get_rbd_support()
self.assertTrue(oldformat)
self.assertEquals(features, 0)
- service.rbd.RBD_FEATURE_LAYERING = 1
+ self.service.rbd.RBD_FEATURE_LAYERING = 1
- oldformat, features = service._get_rbd_support()
+ oldformat, features = self.service._get_rbd_support()
self.assertFalse(oldformat)
self.assertEquals(features, 1)
- service.rbd.RBD_FEATURE_STRIPINGV2 = 2
+ self.service.rbd.RBD_FEATURE_STRIPINGV2 = 2
- oldformat, features = service._get_rbd_support()
+ oldformat, features = self.service._get_rbd_support()
self.assertFalse(oldformat)
self.assertEquals(features, 1 | 2)
- def test_tranfer_data_from_rbd(self):
- service = CephBackupDriver(self.ctxt)
+ def _set_common_backup_stubs(self, service):
+ self.stubs.Set(self.service, '_get_rbd_support', lambda: (True, 3))
+ self.stubs.Set(self.service, 'get_backup_snaps',
+ lambda *args, **kwargs: None)
+
+ def rbd_size(inst):
+ return self.chunk_size * self.num_chunks
+
+ self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
+
+ def _set_common_restore_stubs(self, service):
+ self._set_common_backup_stubs(self.service)
+
+ def rbd_size(inst):
+ return self.chunk_size * self.num_chunks
+
+ self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
+
+ def test_get_most_recent_snap(self):
+ last = 'backup.%s.snap.9824923.1212' % (uuid.uuid4())
+
+ def list_snaps(inst, *args):
+ return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())},
+ {'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())},
+ {'name': last},
+ {'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}]
+
+ self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
+
+ snap = self.service._get_most_recent_snap(self.service.rbd.Image())
+
+ self.assertEquals(last, snap)
+
+ def test_get_backup_snap_name(self):
+ snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
+
+ def mock_get_backup_snaps(inst, *args):
+ return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4()),
+ 'backup_id': str(uuid.uuid4())},
+ {'name': snap_name,
+ 'backup_id': self.backup_id}]
+
+ self.stubs.Set(self.service, 'get_backup_snaps', lambda *args: None)
+ name = self.service._get_backup_snap_name(self.service.rbd.Image(),
+ 'base_foo',
+ self.backup_id)
+ self.assertIsNone(name)
+
+ self.stubs.Set(self.service, 'get_backup_snaps', mock_get_backup_snaps)
+ name = self.service._get_backup_snap_name(self.service.rbd.Image(),
+ 'base_foo',
+ self.backup_id)
+ self.assertEquals(name, snap_name)
+
+ def test_get_backup_snaps(self):
+
+ def list_snaps(inst, *args):
+ return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())},
+ {'name': 'backup.%s.wambam.6423868.2342' % (uuid.uuid4())},
+ {'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())},
+ {'name': 'bbbackup.%s.snap.1321319.3235' % (uuid.uuid4())},
+ {'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}]
+
+ self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
+ snaps = self.service.get_backup_snaps(self.service.rbd.Image())
+ self.assertTrue(len(snaps) == 3)
+
+ def test_transfer_data_from_rbd_to_file(self):
+ self._set_common_backup_stubs(self.service)
with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0)
def read_data(inst, offset, length):
return self.volume_file.read(self.length)
- self.stubs.Set(service.rbd.Image, 'read', read_data)
+ self.stubs.Set(self.service.rbd.Image, 'read', read_data)
- service._transfer_data(service.rbd.Image(), test_file, 'foo',
- self.length)
+ rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+ self.service._transfer_data(rbd_io, 'src_foo', test_file,
+ 'dest_foo', self.length)
checksum = hashlib.sha256()
test_file.seek(0)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
- def test_tranfer_data_to_rbd(self):
- service = CephBackupDriver(self.ctxt)
+ def test_transfer_data_from_rbd_to_rbd(self):
+ def rbd_size(inst):
+ return self.chunk_size * self.num_chunks
with tempfile.NamedTemporaryFile() as test_file:
+ self.volume_file.seek(0)
checksum = hashlib.sha256()
+ def read_data(inst, offset, length):
+ return self.volume_file.read(self.length)
+
def write_data(inst, data, offset):
checksum.update(data)
test_file.write(data)
- self.stubs.Set(service.rbd.Image, 'write', write_data)
+ self.stubs.Set(self.service.rbd.Image, 'read', read_data)
+ self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
+ self.stubs.Set(self.service.rbd.Image, 'write', write_data)
- service._transfer_data(self.volume_file, service.rbd.Image(),
- 'foo', self.length, dest_is_rbd=True)
+ rbd1 = self.service.rbd.Image()
+ rbd2 = self.service.rbd.Image()
+
+ src_rbd_io = self._get_wrapped_rbd_io(rbd1)
+ dest_rbd_io = self._get_wrapped_rbd_io(rbd2)
+ self.service._transfer_data(src_rbd_io, 'src_foo', dest_rbd_io,
+ 'dest_foo', self.length)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
- def test_backup_volume_from_file(self):
- service = CephBackupDriver(self.ctxt)
+ def test_transfer_data_from_file_to_rbd(self):
+ self._set_common_backup_stubs(self.service)
with tempfile.NamedTemporaryFile() as test_file:
+ self.volume_file.seek(0)
checksum = hashlib.sha256()
def write_data(inst, data, offset):
checksum.update(data)
test_file.write(data)
- self.stubs.Set(service.rbd.Image, 'write', write_data)
+ self.stubs.Set(self.service.rbd.Image, 'write', write_data)
- service._backup_volume_from_file('foo', self.length,
- self.volume_file)
+ rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+ self.service._transfer_data(self.volume_file, 'src_foo',
+ rbd_io, 'dest_foo', self.length)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
- def tearDown(self):
- self.volume_file.close()
- super(BackupCephTestCase, self).tearDown()
+ def test_transfer_data_from_file_to_file(self):
+ self._set_common_backup_stubs(self.service)
- def test_backup_error1(self):
- service = CephBackupDriver(self.ctxt)
- backup = db.backup_get(self.ctxt, self.backup_id)
- self._create_volume_db_entry(self.vol_id, 0)
- self.assertRaises(exception.InvalidParameterValue, service.backup,
- backup, self.volume_file)
+ with tempfile.NamedTemporaryFile() as test_file:
+ self.volume_file.seek(0)
+ checksum = hashlib.sha256()
+
+ self.service._transfer_data(self.volume_file, 'src_foo', test_file,
+ 'dest_foo', self.length)
+
+ checksum = hashlib.sha256()
+ test_file.seek(0)
+ for c in xrange(0, self.num_chunks):
+ checksum.update(test_file.read(self.chunk_size))
- def test_backup_error2(self):
- service = CephBackupDriver(self.ctxt)
+ # Ensure the files are equal
+ self.assertEquals(checksum.digest(), self.checksum.digest())
+
+ def test_backup_volume_from_file(self):
+ self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
- self._create_volume_db_entry(self.vol_id, 1)
- self.assertRaises(exception.BackupVolumeInvalidType, service.backup,
- backup, None)
- def test_backup_good(self):
- service = CephBackupDriver(self.ctxt)
+ self._set_common_backup_stubs(self.service)
+
+ with tempfile.NamedTemporaryFile() as test_file:
+ checksum = hashlib.sha256()
+
+ def write_data(inst, data, offset):
+ checksum.update(data)
+ test_file.write(data)
+
+ self.stubs.Set(self.service.rbd.Image, 'write', write_data)
+
+ self.service.backup(backup, self.volume_file)
+
+ # Ensure the files are equal
+ self.assertEquals(checksum.digest(), self.checksum.digest())
+
+ def test_get_backup_base_name(self):
+ name = self.service._get_backup_base_name(self.volume_id,
+ diff_format=True)
+ self.assertEquals(name, "volume-%s.backup.base" % (self.volume_id))
+
+ self.assertRaises(exception.InvalidParameterValue,
+ self.service._get_backup_base_name,
+ self.volume_id)
+
+ name = self.service._get_backup_base_name(self.volume_id, '1234')
+ self.assertEquals(name, "volume-%s.backup.%s" %
+ (self.volume_id, '1234'))
+
+ def test_backup_volume_from_rbd(self):
+ self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
- self._create_volume_db_entry(self.vol_id, 1)
+
+ self._set_common_backup_stubs(self.service)
+
+ backup_name = self.service._get_backup_base_name(self.backup_id,
+ diff_format=True)
+
+ self.stubs.Set(self.service, '_try_delete_base_image',
+ lambda *args, **kwargs: None)
with tempfile.NamedTemporaryFile() as test_file:
checksum = hashlib.sha256()
checksum.update(data)
test_file.write(data)
- self.stubs.Set(service.rbd.Image, 'write', write_data)
+ def read_data(inst, offset, length):
+ return self.volume_file.read(self.length)
+
+ def rbd_list(inst, ioctx):
+ return [backup_name]
+
+ self.stubs.Set(self.service.rbd.Image, 'read', read_data)
+ self.stubs.Set(self.service.rbd.Image, 'write', write_data)
+ self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
+ meta = rbddriver.RBDImageMetadata(self.service.rbd.Image(),
+ 'pool_foo', 'user_foo',
+ 'conf_foo')
+ rbd_io = rbddriver.RBDImageIOWrapper(meta)
- service.backup(backup, self.volume_file)
+ self.service.backup(backup, rbd_io)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
+ def test_backup_vol_length_0(self):
+ self._set_common_backup_stubs(self.service)
+
+ backup = db.backup_get(self.ctxt, self.backup_id)
+ self._create_volume_db_entry(self.volume_id, 0)
+ self.assertRaises(exception.InvalidParameterValue, self.service.backup,
+ backup, self.volume_file)
+
def test_restore(self):
- service = CephBackupDriver(self.ctxt)
- self._create_volume_db_entry(self.vol_id, 1)
+ self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
+ self._set_common_restore_stubs(self.service)
+
+ backup_name = self.service._get_backup_base_name(self.backup_id,
+ diff_format=True)
+
+ def rbd_list(inst, ioctx):
+ return [backup_name]
+
+ self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0)
def read_data(inst, offset, length):
return self.volume_file.read(self.length)
- self.stubs.Set(service.rbd.Image, 'read', read_data)
+ self.stubs.Set(self.service.rbd.Image, 'read', read_data)
- service.restore(backup, self.vol_id, test_file)
+ self.service.restore(backup, self.volume_id, test_file)
checksum = hashlib.sha256()
test_file.seek(0)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
- def test_delete(self):
- service = CephBackupDriver(self.ctxt)
- self._create_volume_db_entry(self.vol_id, 1)
+ def test_create_base_image_if_not_exists(self):
+ pass
+
+ def test_delete_backup_snapshots(self):
+ snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
+ base_name = self.service._get_backup_base_name(self.volume_id,
+ diff_format=True)
+
+ self.stubs.Set(self.service, '_get_backup_snap_name',
+ lambda *args: snap_name)
+
+ self.stubs.Set(self.service, 'get_backup_snaps',
+ lambda *args: None)
+
+ rem = self.service._delete_backup_snapshots(mock_rados(), base_name,
+ self.backup_id)
+
+ self.assertEquals(rem, 0)
+
+ def test_try_delete_base_image_diff_format(self):
+ # don't create volume db entry since it should not be required
backup = db.backup_get(self.ctxt, self.backup_id)
+ backup_name = self.service._get_backup_base_name(self.volume_id,
+ diff_format=True)
+
+ snap_name = self.service._get_new_snap_name(self.backup_id)
+ snaps = [{'name': snap_name}]
+
+ def rbd_list(*args):
+ return [backup_name]
+
+ def list_snaps(*args):
+ return snaps
+
+ def remove_snap(*args):
+ snaps.pop()
+
+ self.stubs.Set(self.service.rbd.Image, 'remove_snap', remove_snap)
+ self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
+ self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
# Must be something mutable
remove_called = []
def remove(inst, ioctx, name):
remove_called.append(True)
- self.stubs.Set(service.rbd.RBD, 'remove', remove)
- service.delete(backup)
+ self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
+ self.service.delete(backup)
self.assertTrue(remove_called[0])
+
+ def test_try_delete_base_image(self):
+ # don't create volume db entry since it should not be required
+ self._create_volume_db_entry(self.volume_id, 1)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+
+ backup_name = self.service._get_backup_base_name(self.volume_id,
+ self.backup_id)
+
+ def rbd_list(inst, ioctx):
+ return [backup_name]
+
+ self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
+ # Must be something mutable
+ remove_called = []
+
+ self.stubs.Set(self.service, 'get_backup_snaps',
+ lambda *args, **kwargs: None)
+
+ def remove(inst, ioctx, name):
+ remove_called.append(True)
+
+ self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
+ self.service.delete(backup)
+ self.assertTrue(remove_called[0])
+
+ def test_try_delete_base_image_busy(self):
+ """This should induce retries then raise rbd.ImageBusy."""
+ # don't create volume db entry since it should not be required
+ self._create_volume_db_entry(self.volume_id, 1)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+
+ backup_name = self.service._get_backup_base_name(self.volume_id,
+ self.backup_id)
+
+ def rbd_list(inst, ioctx):
+ return [backup_name]
+
+ self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
+ # Must be something mutable
+ remove_called = []
+
+ self.stubs.Set(self.service, 'get_backup_snaps',
+ lambda *args, **kwargs: None)
+
+ def remove(inst, ioctx, name):
+ raise self.service.rbd.ImageBusy("image busy")
+
+ self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
+
+ self.assertRaises(self.service.rbd.ImageBusy,
+ self.service._try_delete_base_image,
+ backup['id'], backup['volume_id'])
+
+ def test_delete(self):
+ backup = db.backup_get(self.ctxt, self.backup_id)
+
+ def del_base_image(*args):
+ pass
+
+ self.stubs.Set(self.service, '_try_delete_base_image',
+ lambda *args: None)
+
+ self.service.delete(backup)
+
+ def test_delete_image_not_found(self):
+ backup = db.backup_get(self.ctxt, self.backup_id)
+
+ def del_base_image(*args):
+ raise self.service.rbd.ImageNotFound
+
+ self.stubs.Set(self.service, '_try_delete_base_image',
+ lambda *args: None)
+
+ # ImageNotFound exception is caught so that db entry can be cleared
+ self.service.delete(backup)
+
+ def test_diff_restore_allowed_true(self):
+ is_allowed = (True, 'restore.foo')
+ backup = db.backup_get(self.ctxt, self.backup_id)
+ alt_volume_id = str(uuid.uuid4())
+ self._create_volume_db_entry(alt_volume_id, 1)
+ alt_volume = db.volume_get(self.ctxt, alt_volume_id)
+ rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+
+ self.stubs.Set(self.service, '_get_restore_point',
+ lambda *args: 'restore.foo')
+
+ self.stubs.Set(self.service, '_rbd_has_extents',
+ lambda *args: False)
+
+ self.stubs.Set(self.service, '_rbd_image_exists',
+ lambda *args: (True, 'foo'))
+
+ self.stubs.Set(self.service, '_file_is_rbd', lambda *args: True)
+
+ resp = self.service._diff_restore_allowed('foo', backup, alt_volume,
+ rbd_io, mock_rados())
+ self.assertEquals(resp, is_allowed)
+
+ def test_diff_restore_allowed_false(self):
+ not_allowed = (False, None)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+ self._create_volume_db_entry(self.volume_id, 1)
+ original_volume = db.volume_get(self.ctxt, self.volume_id)
+ rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+
+ self.stubs.Set(self.service, '_get_restore_point',
+ lambda *args: None)
+
+ self.stubs.Set(self.service, '_rbd_has_extents',
+ lambda *args: True)
+
+ self.stubs.Set(self.service, '_rbd_image_exists',
+ lambda *args: (False, 'foo'))
+
+ self.stubs.Set(self.service, '_file_is_rbd', lambda *args: False)
+
+ resp = self.service._diff_restore_allowed('foo', backup,
+ original_volume, rbd_io,
+ mock_rados())
+ self.assertEquals(resp, not_allowed)
+
+ def tearDown(self):
+ self.volume_file.close()
+ self.stubs.UnsetAll()
+ super(BackupCephTestCase, self).tearDown()