# it is large enough for the backup
if volume_id is None:
name = 'restore_backup_%s' % backup_id
- description = 'auto-created_from_restore_from_swift'
+ description = 'auto-created_from_restore_from_backup'
LOG.audit(_("Creating volume of %(size)s GB for restore of "
"backup %(backup_id)s"),
"""
Backup manager manages volume backups.
-Volume Backups are full copies of persistent volumes stored in Swift object
-storage. They are usable without the original object being available. A
+Volume Backups are full copies of persistent volumes stored in a backup
+store e.g. an object store or any other backup store if and when support is
+added. They are usable without the original object being available. A
volume backup can be restored to the original volume it was created from or
any other available volume with a minimum size of the original volume.
Volume backups can be created, restored, deleted and listed.
--- /dev/null
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Ceph Backup Service Implementation"""
+
+from cinder.db import base
+from cinder import exception
+from cinder.openstack.common import log as logging
+from cinder import units
+import cinder.volume.drivers.rbd as rbddriver
+import eventlet
+import os
+from oslo.config import cfg
+import time
+
+try:
+ import rados
+ import rbd
+except ImportError:
+ rados = None
+ rbd = None
+
+LOG = logging.getLogger(__name__)
+
+service_opts = [
+ cfg.StrOpt('backup_ceph_conf', default='/etc/ceph/ceph.conf',
+ help='Ceph config file to use.'),
+ cfg.StrOpt('backup_ceph_user', default='cinder',
+ help='the Ceph user to connect with'),
+ cfg.StrOpt('backup_ceph_chunk_size', default=(units.MiB * 128),
+ help='the chunk size in bytes that a backup will be broken '
+ 'into before transfer to backup store'),
+ cfg.StrOpt('backup_ceph_pool', default='backups',
+ help='the Ceph pool to backup to'),
+ cfg.StrOpt('backup_ceph_stripe_unit', default=0,
+ help='RBD stripe unit to use when creating a backup image'),
+ cfg.StrOpt('backup_ceph_stripe_count', default=0,
+ help='RBD stripe count to use when creating a backup image')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(service_opts)
+
+
+class CephBackupService(base.Base):
+ """Backup up Cinder volumes to Ceph Object Store"""
+
+ def __init__(self, context, db_driver=None):
+ super(CephBackupService, self).__init__(db_driver)
+ self.rbd = rbd
+ self.rados = rados
+ self.context = context
+ self.chunk_size = CONF.backup_ceph_chunk_size
+ if self._supports_stripingv2():
+ self.rbd_stripe_unit = int(CONF.backup_ceph_stripe_unit)
+ self.rbd_stripe_count = int(CONF.backup_ceph_stripe_count)
+ else:
+ LOG.info("rbd striping not supported - ignoring conf settings "
+ "for rbd striping")
+ self.rbd_stripe_count = 0
+ self.rbd_stripe_unit = 0
+
+ self._ceph_user = str(CONF.backup_ceph_user)
+ self._ceph_pool = str(CONF.backup_ceph_pool)
+ self._ceph_conf = str(CONF.backup_ceph_conf)
+
+ def _supports_layering(self):
+ """
+ Determine whether copy-on-write is supported by our version of librbd
+ """
+ return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
+
+ def _supports_stripingv2(self):
+ """
+ Determine whether striping is supported by our version of librbd
+ """
+ return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2')
+
+ def _get_rbd_support(self):
+ old_format = True
+ features = 0
+ if self._supports_layering():
+ old_format = False
+ features |= self.rbd.RBD_FEATURE_LAYERING
+ if self._supports_stripingv2():
+ old_format = False
+ features |= self.rbd.RBD_FEATURE_STRIPINGV2
+
+ return (old_format, features)
+
+ def _connect_to_rados(self, pool=None):
+ """Establish connection to the Ceph cluster"""
+ client = self.rados.Rados(rados_id=self._ceph_user,
+ conffile=self._ceph_conf)
+ try:
+ client.connect()
+ pool_to_open = str(pool or self._ceph_pool)
+ ioctx = client.open_ioctx(pool_to_open)
+ return client, ioctx
+ except self.rados.Error:
+ # shutdown cannot raise an exception
+ client.shutdown()
+ raise
+
+ def _disconnect_from_rados(self, client, ioctx):
+ """Terminate connection with the Ceph cluster"""
+ # closing an ioctx cannot raise an exception
+ ioctx.close()
+ client.shutdown()
+
+ def _get_backup_rbd_name(self, vol_name, backup_id):
+ """Make sure we use a consistent format for backup names"""
+ # ensure no unicode
+ return str("%s.backup.%s" % (vol_name, backup_id))
+
+ def _transfer_data(self, src, dest, dest_name, length, dest_is_rbd=False):
+ """
+ Transfer data between file and rbd. If destination is rbd, source is
+ assumed to be file, otherwise source is assumed to be rbd.
+ """
+ chunks = int(length / self.chunk_size)
+ LOG.debug("transferring %s chunks of %s bytes to '%s'" %
+ (chunks, self.chunk_size, dest_name))
+ for chunk in xrange(0, chunks):
+ offset = chunk * self.chunk_size
+ before = time.time()
+
+ if dest_is_rbd:
+ dest.write(src.read(self.chunk_size), offset)
+ # note(dosaboy): librbd writes are synchronous so flush() will
+ # have not effect. Also, flush only supported in more recent
+ # versions of librbd.
+ else:
+ dest.write(src.read(offset, self.chunk_size))
+ dest.flush()
+
+ delta = (time.time() - before)
+ rate = (self.chunk_size / delta) / 1024
+ LOG.debug("transferred chunk %s of %s (%dK/s)" %
+ (chunk, chunks, rate))
+
+ # yield to any other pending backups
+ eventlet.sleep(0)
+
+ rem = int(length % self.chunk_size)
+ if rem:
+ LOG.debug("transferring remaining %s bytes" % (rem))
+ offset = (length - rem)
+ if dest_is_rbd:
+ dest.write(src.read(rem), offset)
+ # note(dosaboy): librbd writes are synchronous so flush() will
+ # have not effect. Also, flush only supported in more recent
+ # versions of librbd.
+ else:
+ dest.write(src.read(offset, rem))
+ dest.flush()
+
+ # yield to any other pending backups
+ eventlet.sleep(0)
+
+ def _backup_volume_from_file(self, backup_name, backup_size, volume_file):
+ """Backup a volume from file stream"""
+ LOG.debug("performing backup from file")
+
+ old_format, features = self._get_rbd_support()
+
+ with rbddriver.RADOSClient(self, self._ceph_pool) as client:
+ self.rbd.RBD().create(ioctx=client.ioctx,
+ name=backup_name,
+ size=backup_size,
+ old_format=old_format,
+ features=features,
+ stripe_unit=self.rbd_stripe_unit,
+ stripe_count=self.rbd_stripe_count)
+
+ dest_rbd = self.rbd.Image(client.ioctx, backup_name)
+ try:
+ self._transfer_data(volume_file, dest_rbd, backup_name,
+ backup_size, dest_is_rbd=True)
+ finally:
+ dest_rbd.close()
+
+ def backup(self, backup, volume_file):
+ """Backup the given volume to Ceph object store"""
+ backup_id = backup['id']
+ volume = self.db.volume_get(self.context, backup['volume_id'])
+ backup_name = self._get_backup_rbd_name(volume['name'], backup_id)
+
+ LOG.debug("Starting backup of volume='%s' to rbd='%s'" %
+ (volume['name'], backup_name))
+
+ if int(volume['size']) == 0:
+ raise exception.InvalidParameterValue("need non-zero volume size")
+ else:
+ backup_size = int(volume['size']) * units.GiB
+
+ if volume_file:
+ self._backup_volume_from_file(backup_name, backup_size,
+ volume_file)
+ else:
+ errmsg = ("No volume_file was provided so I cannot do requested "
+ "backup (id=%s)" % (backup_id))
+ raise exception.BackupVolumeInvalidType(errmsg)
+
+ self.db.backup_update(self.context, backup['id'],
+ {'container': self._ceph_pool})
+
+ LOG.debug(_("backup '%s' finished.") % (backup_id))
+
+ def restore(self, backup, volume_id, volume_file):
+ """Restore the given volume backup from Ceph object store"""
+ volume_id = backup['volume_id']
+ volume = self.db.volume_get(self.context, volume_id)
+ backup_name = self._get_backup_rbd_name(volume['name'], backup['id'])
+
+ LOG.debug('starting backup restore from Ceph backup=%s '
+ 'to volume=%s' % (backup['id'], volume['name']))
+
+ # Ensure we are at the beginning of the volume
+ volume_file.seek(0)
+
+ backup_size = int(volume['size']) * units.GiB
+
+ with rbddriver.RADOSClient(self, self._ceph_pool) as client:
+ src_rbd = self.rbd.Image(client.ioctx, backup_name)
+ try:
+ self._transfer_data(src_rbd, volume_file, volume['name'],
+ backup_size)
+ finally:
+ src_rbd.close()
+
+ # Be tolerant to IO implementations that do not support fileno()
+ try:
+ fileno = volume_file.fileno()
+ except IOError:
+ LOG.info("volume_file does not support fileno() so skipping "
+ "fsync()")
+ else:
+ os.fsync(fileno)
+
+ LOG.debug('restore %s to %s finished.' % (backup['id'], volume_id))
+
+ def delete(self, backup):
+ """Delete the given backup from Ceph object store"""
+ backup_id = backup['id']
+ volume_id = backup['volume_id']
+ volume = self.db.volume_get(self.context, volume_id)
+ backup_name = self._get_backup_rbd_name(volume['name'], backup_id)
+
+ LOG.debug('delete started for backup=%s', backup['id'])
+
+ try:
+ with rbddriver.RADOSClient(self) as client:
+ self.rbd.RBD().remove(client.ioctx, backup_name)
+ except self.rbd.ImageNotFound:
+ LOG.warning("rbd image '%s' not found but continuing anyway so "
+ "that db entry can be removed" % (backup_name))
+
+ LOG.debug(_("delete '%s' finished") % (backup_id))
+
+
+def get_backup_service(context):
+ return CephBackupService(context)
message = _("Failed to copy image to volume: %(reason)s")
+class BackupVolumeInvalidType(Invalid):
+ message = _("Backup volume %(volume_id)s type not recognised.")
+
+
class BackupNotFound(NotFound):
message = _("Backup %(backup_id)s could not be found.")
--- /dev/null
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class mock_rados(object):
+
+ class mock_ioctx(object):
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def close(self, *args, **kwargs):
+ pass
+
+ class Rados(object):
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def connect(self, *args, **kwargs):
+ pass
+
+ def open_ioctx(self, *args, **kwargs):
+ return mock_rados.mock_ioctx()
+
+ def shutdown(self, *args, **kwargs):
+ pass
+
+ class Error():
+ def __init__(self, *args, **kwargs):
+ pass
+
+
+class mock_rbd(object):
+
+ class Image(object):
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def read(self, *args, **kwargs):
+ pass
+
+ def write(self, *args, **kwargs):
+ pass
+
+ def resize(self, *args, **kwargs):
+ pass
+
+ def close(self, *args, **kwargs):
+ pass
+
+ class RBD(object):
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def create(self, *args, **kwargs):
+ pass
+
+ def remove(self, *args, **kwargs):
+ pass
+
+ class ImageNotFound(Exception):
+ def __init__(self, *args, **kwargs):
+ pass
--- /dev/null
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+""" Tests for Ceph backup service """
+
+import hashlib
+import os
+import tempfile
+import uuid
+
+from cinder.backup.services.ceph import CephBackupService
+from cinder.tests.backup.fake_rados import mock_rados
+from cinder.tests.backup.fake_rados import mock_rbd
+
+from cinder.backup.services import ceph
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder.openstack.common import log as logging
+from cinder import test
+
+LOG = logging.getLogger(__name__)
+
+
+class BackupCephTestCase(test.TestCase):
+ """Test Case for backup to Ceph object store"""
+
+ def _create_volume_db_entry(self, id, size):
+ vol = {'id': id, 'size': size, 'status': 'available'}
+ return db.volume_create(self.ctxt, vol)['id']
+
+ def _create_backup_db_entry(self, backupid, volid, size):
+ backup = {'id': backupid, 'size': size, 'volume_id': volid}
+ return db.backup_create(self.ctxt, backup)['id']
+
+ def setUp(self):
+ super(BackupCephTestCase, self).setUp()
+ self.ctxt = context.get_admin_context()
+
+ self.vol_id = str(uuid.uuid4())
+ self.backup_id = str(uuid.uuid4())
+
+ # Setup librbd stubs
+ self.stubs.Set(ceph, 'rados', mock_rados)
+ self.stubs.Set(ceph, 'rbd', mock_rbd)
+
+ self._create_backup_db_entry(self.backup_id, self.vol_id, 1)
+
+ self.chunk_size = 1024
+ self.num_chunks = 128
+ self.length = self.num_chunks * self.chunk_size
+
+ self.checksum = hashlib.sha256()
+
+ # Create a file with some data in it
+ self.volume_file = tempfile.NamedTemporaryFile()
+ for i in xrange(0, self.num_chunks):
+ data = os.urandom(self.chunk_size)
+ self.checksum.update(data)
+ self.volume_file.write(data)
+
+ self.volume_file.seek(0)
+
+ def test_get_rbd_support(self):
+ service = CephBackupService(self.ctxt)
+
+ self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_LAYERING'))
+ self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_STRIPINGV2'))
+
+ oldformat, features = service._get_rbd_support()
+ self.assertTrue(oldformat)
+ self.assertEquals(features, 0)
+
+ service.rbd.RBD_FEATURE_LAYERING = 1
+
+ oldformat, features = service._get_rbd_support()
+ self.assertFalse(oldformat)
+ self.assertEquals(features, 1)
+
+ service.rbd.RBD_FEATURE_STRIPINGV2 = 2
+
+ oldformat, features = service._get_rbd_support()
+ self.assertFalse(oldformat)
+ self.assertEquals(features, 1 | 2)
+
+ def test_tranfer_data_from_rbd(self):
+ service = CephBackupService(self.ctxt)
+
+ with tempfile.NamedTemporaryFile() as test_file:
+ self.volume_file.seek(0)
+
+ def read_data(inst, offset, length):
+ return self.volume_file.read(self.length)
+
+ self.stubs.Set(service.rbd.Image, 'read', read_data)
+
+ service._transfer_data(service.rbd.Image(), test_file, 'foo',
+ self.length)
+
+ checksum = hashlib.sha256()
+ test_file.seek(0)
+ for c in xrange(0, self.num_chunks):
+ checksum.update(test_file.read(self.chunk_size))
+
+ # Ensure the files are equal
+ self.assertEquals(checksum.digest(), self.checksum.digest())
+
+ def test_tranfer_data_to_rbd(self):
+ service = CephBackupService(self.ctxt)
+
+ with tempfile.NamedTemporaryFile() as test_file:
+ checksum = hashlib.sha256()
+
+ def write_data(inst, data, offset):
+ checksum.update(data)
+ test_file.write(data)
+
+ self.stubs.Set(service.rbd.Image, 'write', write_data)
+
+ service._transfer_data(self.volume_file, service.rbd.Image(),
+ 'foo', self.length, dest_is_rbd=True)
+
+ # Ensure the files are equal
+ self.assertEquals(checksum.digest(), self.checksum.digest())
+
+ def test_backup_volume_from_file(self):
+ service = CephBackupService(self.ctxt)
+
+ with tempfile.NamedTemporaryFile() as test_file:
+ checksum = hashlib.sha256()
+
+ def write_data(inst, data, offset):
+ checksum.update(data)
+ test_file.write(data)
+
+ self.stubs.Set(service.rbd.Image, 'write', write_data)
+
+ service._backup_volume_from_file('foo', self.length,
+ self.volume_file)
+
+ # Ensure the files are equal
+ self.assertEquals(checksum.digest(), self.checksum.digest())
+
+ def tearDown(self):
+ self.volume_file.close()
+ super(BackupCephTestCase, self).tearDown()
+
+ def test_backup_error1(self):
+ service = CephBackupService(self.ctxt)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+ self._create_volume_db_entry(self.vol_id, 0)
+ self.assertRaises(exception.InvalidParameterValue, service.backup,
+ backup, self.volume_file)
+
+ def test_backup_error2(self):
+ service = CephBackupService(self.ctxt)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+ self._create_volume_db_entry(self.vol_id, 1)
+ self.assertRaises(exception.BackupVolumeInvalidType, service.backup,
+ backup, None)
+
+ def test_backup_good(self):
+ service = CephBackupService(self.ctxt)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+ self._create_volume_db_entry(self.vol_id, 1)
+
+ with tempfile.NamedTemporaryFile() as test_file:
+ checksum = hashlib.sha256()
+
+ def write_data(inst, data, offset):
+ checksum.update(data)
+ test_file.write(data)
+
+ self.stubs.Set(service.rbd.Image, 'write', write_data)
+
+ service.backup(backup, self.volume_file)
+
+ # Ensure the files are equal
+ self.assertEquals(checksum.digest(), self.checksum.digest())
+
+ def test_restore(self):
+ service = CephBackupService(self.ctxt)
+ self._create_volume_db_entry(self.vol_id, 1)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+
+ with tempfile.NamedTemporaryFile() as test_file:
+ self.volume_file.seek(0)
+
+ def read_data(inst, offset, length):
+ return self.volume_file.read(self.length)
+
+ self.stubs.Set(service.rbd.Image, 'read', read_data)
+
+ service.restore(backup, self.vol_id, test_file)
+
+ checksum = hashlib.sha256()
+ test_file.seek(0)
+ for c in xrange(0, self.num_chunks):
+ checksum.update(test_file.read(self.chunk_size))
+
+ # Ensure the files are equal
+ self.assertEquals(checksum.digest(), self.checksum.digest())
+
+ def test_delete(self):
+ service = CephBackupService(self.ctxt)
+ self._create_volume_db_entry(self.vol_id, 1)
+ backup = db.backup_get(self.ctxt, self.backup_id)
+
+ # Must be something mutable
+ remove_called = []
+
+ def remove(inst, ioctx, name):
+ remove_called.append(True)
+
+ self.stubs.Set(service.rbd.RBD, 'remove', remove)
+ service.delete(backup)
+ self.assertTrue(remove_called[0])
#backup_compression_algorithm=zlib
+#
+# Options defined in cinder.backup.services.ceph
+#
+
+# The configration file to use for the backup cluster (string value)
+#backup_ceph_conf=/etc/ceph/ceph.conf
+
+# The Ceph user with permissions to access the backup pool (string value)
+#backup_ceph_user=cinder
+
+# The RADOS pool in which volume backups are stored (string value)
+#backup_ceph_pool=backups
+
+# The RBD stripe unit to use when creating a backup image (integer value)
+#backup_ceph_stripe_unit=0
+
+# The RBD stripe count to use when creating a backup image (integer value)
+#backup_ceph_stripe_count=0
+
+# The chunk size used to break up the data when transferring to Ceph object
+# store.
+#backup_ceph_chunk_size=134217728
+
#
# Options defined in cinder.db.api
#