]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Added Cinder volume backup to Ceph support
authorEdward Hope-Morley <edward.hope-morley@canonical.com>
Tue, 25 Jun 2013 09:11:11 +0000 (10:11 +0100)
committerEdward Hope-Morley <edward.hope-morley@canonical.com>
Mon, 1 Jul 2013 09:06:42 +0000 (10:06 +0100)
Added new Ceph backup service to allow backup
of Cinder volumes to a Ceph object store. This
driver is compatible with the existing backup
interface provided by the Swift backup service.

Implements: blueprint cinder-backup-to-ceph

Change-Id: I299f033347cb263e7169c4a4efb758d19e753f46

cinder/backup/api.py
cinder/backup/manager.py
cinder/backup/services/ceph.py [new file with mode: 0644]
cinder/exception.py
cinder/tests/backup/fake_rados.py [new file with mode: 0644]
cinder/tests/test_backup_ceph.py [new file with mode: 0644]
etc/cinder/cinder.conf.sample

index 325f86231aaf85b3f950ab39b2b3444dcfdfc3c0..644de210cb06d2a0b996ff6d3cc35e78923e735e 100644 (file)
@@ -133,7 +133,7 @@ class API(base.Base):
         # it is large enough for the backup
         if volume_id is None:
             name = 'restore_backup_%s' % backup_id
-            description = 'auto-created_from_restore_from_swift'
+            description = 'auto-created_from_restore_from_backup'
 
             LOG.audit(_("Creating volume of %(size)s GB for restore of "
                         "backup %(backup_id)s"),
index 08b9d49420211198371fc2a389320473165c0c11..4714454670131d8dd8cb4eccfb5dd69024eb2f04 100755 (executable)
@@ -16,8 +16,9 @@
 """
 Backup manager manages volume backups.
 
-Volume Backups are full copies of persistent volumes stored in Swift object
-storage. They are usable without the original object being available. A
+Volume Backups are full copies of persistent volumes stored in a backup
+store e.g. an object store or any other backup store if and when support is
+added. They are usable without the original object being available. A
 volume backup can be restored to the original volume it was created from or
 any other available volume with a minimum size of the original volume.
 Volume backups can be created, restored, deleted and listed.
diff --git a/cinder/backup/services/ceph.py b/cinder/backup/services/ceph.py
new file mode 100644 (file)
index 0000000..9ff65c1
--- /dev/null
@@ -0,0 +1,275 @@
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Ceph Backup Service Implementation"""
+
+from cinder.db import base
+from cinder import exception
+from cinder.openstack.common import log as logging
+from cinder import units
+import cinder.volume.drivers.rbd as rbddriver
+import eventlet
+import os
+from oslo.config import cfg
+import time
+
+try:
+    import rados
+    import rbd
+except ImportError:
+    rados = None
+    rbd = None
+
+LOG = logging.getLogger(__name__)
+
+service_opts = [
+    cfg.StrOpt('backup_ceph_conf', default='/etc/ceph/ceph.conf',
+               help='Ceph config file to use.'),
+    cfg.StrOpt('backup_ceph_user', default='cinder',
+               help='the Ceph user to connect with'),
+    cfg.StrOpt('backup_ceph_chunk_size', default=(units.MiB * 128),
+               help='the chunk size in bytes that a backup will be broken '
+                    'into before transfer to backup store'),
+    cfg.StrOpt('backup_ceph_pool', default='backups',
+               help='the Ceph pool to backup to'),
+    cfg.StrOpt('backup_ceph_stripe_unit', default=0,
+               help='RBD stripe unit to use when creating a backup image'),
+    cfg.StrOpt('backup_ceph_stripe_count', default=0,
+               help='RBD stripe count to use when creating a backup image')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(service_opts)
+
+
+class CephBackupService(base.Base):
+    """Backup up Cinder volumes to Ceph Object Store"""
+
+    def __init__(self, context, db_driver=None):
+        super(CephBackupService, self).__init__(db_driver)
+        self.rbd = rbd
+        self.rados = rados
+        self.context = context
+        self.chunk_size = CONF.backup_ceph_chunk_size
+        if self._supports_stripingv2():
+            self.rbd_stripe_unit = int(CONF.backup_ceph_stripe_unit)
+            self.rbd_stripe_count = int(CONF.backup_ceph_stripe_count)
+        else:
+            LOG.info("rbd striping not supported - ignoring conf settings "
+                     "for rbd striping")
+            self.rbd_stripe_count = 0
+            self.rbd_stripe_unit = 0
+
+        self._ceph_user = str(CONF.backup_ceph_user)
+        self._ceph_pool = str(CONF.backup_ceph_pool)
+        self._ceph_conf = str(CONF.backup_ceph_conf)
+
+    def _supports_layering(self):
+        """
+        Determine whether copy-on-write is supported by our version of librbd
+        """
+        return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
+
+    def _supports_stripingv2(self):
+        """
+        Determine whether striping is supported by our version of librbd
+        """
+        return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2')
+
+    def _get_rbd_support(self):
+        old_format = True
+        features = 0
+        if self._supports_layering():
+            old_format = False
+            features |= self.rbd.RBD_FEATURE_LAYERING
+        if self._supports_stripingv2():
+            old_format = False
+            features |= self.rbd.RBD_FEATURE_STRIPINGV2
+
+        return (old_format, features)
+
+    def _connect_to_rados(self, pool=None):
+        """Establish connection to the Ceph cluster"""
+        client = self.rados.Rados(rados_id=self._ceph_user,
+                                  conffile=self._ceph_conf)
+        try:
+            client.connect()
+            pool_to_open = str(pool or self._ceph_pool)
+            ioctx = client.open_ioctx(pool_to_open)
+            return client, ioctx
+        except self.rados.Error:
+            # shutdown cannot raise an exception
+            client.shutdown()
+            raise
+
+    def _disconnect_from_rados(self, client, ioctx):
+        """Terminate connection with the Ceph cluster"""
+        # closing an ioctx cannot raise an exception
+        ioctx.close()
+        client.shutdown()
+
+    def _get_backup_rbd_name(self, vol_name, backup_id):
+        """Make sure we use a consistent format for backup names"""
+        # ensure no unicode
+        return str("%s.backup.%s" % (vol_name, backup_id))
+
+    def _transfer_data(self, src, dest, dest_name, length, dest_is_rbd=False):
+        """
+        Transfer data between file and rbd. If destination is rbd, source is
+        assumed to be file, otherwise source is assumed to be rbd.
+        """
+        chunks = int(length / self.chunk_size)
+        LOG.debug("transferring %s chunks of %s bytes to '%s'" %
+                  (chunks, self.chunk_size, dest_name))
+        for chunk in xrange(0, chunks):
+            offset = chunk * self.chunk_size
+            before = time.time()
+
+            if dest_is_rbd:
+                dest.write(src.read(self.chunk_size), offset)
+                # note(dosaboy): librbd writes are synchronous so flush() will
+                # have not effect. Also, flush only supported in more recent
+                # versions of librbd.
+            else:
+                dest.write(src.read(offset, self.chunk_size))
+                dest.flush()
+
+            delta = (time.time() - before)
+            rate = (self.chunk_size / delta) / 1024
+            LOG.debug("transferred chunk %s of %s (%dK/s)" %
+                      (chunk, chunks, rate))
+
+            # yield to any other pending backups
+            eventlet.sleep(0)
+
+        rem = int(length % self.chunk_size)
+        if rem:
+            LOG.debug("transferring remaining %s bytes" % (rem))
+            offset = (length - rem)
+            if dest_is_rbd:
+                dest.write(src.read(rem), offset)
+                # note(dosaboy): librbd writes are synchronous so flush() will
+                # have not effect. Also, flush only supported in more recent
+                # versions of librbd.
+            else:
+                dest.write(src.read(offset, rem))
+                dest.flush()
+
+            # yield to any other pending backups
+            eventlet.sleep(0)
+
+    def _backup_volume_from_file(self, backup_name, backup_size, volume_file):
+        """Backup a volume from file stream"""
+        LOG.debug("performing backup from file")
+
+        old_format, features = self._get_rbd_support()
+
+        with rbddriver.RADOSClient(self, self._ceph_pool) as client:
+            self.rbd.RBD().create(ioctx=client.ioctx,
+                                  name=backup_name,
+                                  size=backup_size,
+                                  old_format=old_format,
+                                  features=features,
+                                  stripe_unit=self.rbd_stripe_unit,
+                                  stripe_count=self.rbd_stripe_count)
+
+            dest_rbd = self.rbd.Image(client.ioctx, backup_name)
+            try:
+                self._transfer_data(volume_file, dest_rbd, backup_name,
+                                    backup_size, dest_is_rbd=True)
+            finally:
+                dest_rbd.close()
+
+    def backup(self, backup, volume_file):
+        """Backup the given volume to Ceph object store"""
+        backup_id = backup['id']
+        volume = self.db.volume_get(self.context, backup['volume_id'])
+        backup_name = self._get_backup_rbd_name(volume['name'], backup_id)
+
+        LOG.debug("Starting backup of volume='%s' to rbd='%s'" %
+                  (volume['name'], backup_name))
+
+        if int(volume['size']) == 0:
+            raise exception.InvalidParameterValue("need non-zero volume size")
+        else:
+            backup_size = int(volume['size']) * units.GiB
+
+        if volume_file:
+            self._backup_volume_from_file(backup_name, backup_size,
+                                          volume_file)
+        else:
+            errmsg = ("No volume_file was provided so I cannot do requested "
+                      "backup (id=%s)" % (backup_id))
+            raise exception.BackupVolumeInvalidType(errmsg)
+
+        self.db.backup_update(self.context, backup['id'],
+                              {'container': self._ceph_pool})
+
+        LOG.debug(_("backup '%s' finished.") % (backup_id))
+
+    def restore(self, backup, volume_id, volume_file):
+        """Restore the given volume backup from Ceph object store"""
+        volume_id = backup['volume_id']
+        volume = self.db.volume_get(self.context, volume_id)
+        backup_name = self._get_backup_rbd_name(volume['name'], backup['id'])
+
+        LOG.debug('starting backup restore from Ceph backup=%s '
+                  'to volume=%s' % (backup['id'], volume['name']))
+
+        # Ensure we are at the beginning of the volume
+        volume_file.seek(0)
+
+        backup_size = int(volume['size']) * units.GiB
+
+        with rbddriver.RADOSClient(self, self._ceph_pool) as client:
+            src_rbd = self.rbd.Image(client.ioctx, backup_name)
+            try:
+                self._transfer_data(src_rbd, volume_file, volume['name'],
+                                    backup_size)
+            finally:
+                src_rbd.close()
+
+        # Be tolerant to IO implementations that do not support fileno()
+        try:
+            fileno = volume_file.fileno()
+        except IOError:
+            LOG.info("volume_file does not support fileno() so skipping "
+                     "fsync()")
+        else:
+            os.fsync(fileno)
+
+        LOG.debug('restore %s to %s finished.' % (backup['id'], volume_id))
+
+    def delete(self, backup):
+        """Delete the given backup from Ceph object store"""
+        backup_id = backup['id']
+        volume_id = backup['volume_id']
+        volume = self.db.volume_get(self.context, volume_id)
+        backup_name = self._get_backup_rbd_name(volume['name'], backup_id)
+
+        LOG.debug('delete started for backup=%s', backup['id'])
+
+        try:
+            with rbddriver.RADOSClient(self) as client:
+                self.rbd.RBD().remove(client.ioctx, backup_name)
+        except self.rbd.ImageNotFound:
+            LOG.warning("rbd image '%s' not found but continuing anyway so "
+                        "that db entry can be removed" % (backup_name))
+
+        LOG.debug(_("delete '%s' finished") % (backup_id))
+
+
+def get_backup_service(context):
+    return CephBackupService(context)
index ccfc95c19c5247926866c86ca658f64fc2f6fe00..ac0cc157a30916598b5a32abde1ef92bd5148c39 100644 (file)
@@ -564,6 +564,10 @@ class ImageCopyFailure(Invalid):
     message = _("Failed to copy image to volume: %(reason)s")
 
 
+class BackupVolumeInvalidType(Invalid):
+    message = _("Backup volume %(volume_id)s type not recognised.")
+
+
 class BackupNotFound(NotFound):
     message = _("Backup %(backup_id)s could not be found.")
 
diff --git a/cinder/tests/backup/fake_rados.py b/cinder/tests/backup/fake_rados.py
new file mode 100644 (file)
index 0000000..2169cc0
--- /dev/null
@@ -0,0 +1,77 @@
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+class mock_rados(object):
+
+    class mock_ioctx(object):
+        def __init__(self, *args, **kwargs):
+            pass
+
+        def close(self, *args, **kwargs):
+            pass
+
+    class Rados(object):
+
+        def __init__(self, *args, **kwargs):
+            pass
+
+        def connect(self, *args, **kwargs):
+            pass
+
+        def open_ioctx(self, *args, **kwargs):
+            return mock_rados.mock_ioctx()
+
+        def shutdown(self, *args, **kwargs):
+            pass
+
+    class Error():
+        def __init__(self, *args, **kwargs):
+            pass
+
+
+class mock_rbd(object):
+
+    class Image(object):
+
+        def __init__(self, *args, **kwargs):
+            pass
+
+        def read(self, *args, **kwargs):
+            pass
+
+        def write(self, *args, **kwargs):
+            pass
+
+        def resize(self, *args, **kwargs):
+            pass
+
+        def close(self, *args, **kwargs):
+            pass
+
+    class RBD(object):
+
+        def __init__(self, *args, **kwargs):
+            pass
+
+        def create(self, *args, **kwargs):
+            pass
+
+        def remove(self, *args, **kwargs):
+            pass
+
+    class ImageNotFound(Exception):
+        def __init__(self, *args, **kwargs):
+            pass
diff --git a/cinder/tests/test_backup_ceph.py b/cinder/tests/test_backup_ceph.py
new file mode 100644 (file)
index 0000000..b65e474
--- /dev/null
@@ -0,0 +1,228 @@
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+""" Tests for Ceph backup service """
+
+import hashlib
+import os
+import tempfile
+import uuid
+
+from cinder.backup.services.ceph import CephBackupService
+from cinder.tests.backup.fake_rados import mock_rados
+from cinder.tests.backup.fake_rados import mock_rbd
+
+from cinder.backup.services import ceph
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder.openstack.common import log as logging
+from cinder import test
+
+LOG = logging.getLogger(__name__)
+
+
+class BackupCephTestCase(test.TestCase):
+    """Test Case for backup to Ceph object store"""
+
+    def _create_volume_db_entry(self, id, size):
+        vol = {'id': id, 'size': size, 'status': 'available'}
+        return db.volume_create(self.ctxt, vol)['id']
+
+    def _create_backup_db_entry(self, backupid, volid, size):
+        backup = {'id': backupid, 'size': size, 'volume_id': volid}
+        return db.backup_create(self.ctxt, backup)['id']
+
+    def setUp(self):
+        super(BackupCephTestCase, self).setUp()
+        self.ctxt = context.get_admin_context()
+
+        self.vol_id = str(uuid.uuid4())
+        self.backup_id = str(uuid.uuid4())
+
+        # Setup librbd stubs
+        self.stubs.Set(ceph, 'rados', mock_rados)
+        self.stubs.Set(ceph, 'rbd', mock_rbd)
+
+        self._create_backup_db_entry(self.backup_id, self.vol_id, 1)
+
+        self.chunk_size = 1024
+        self.num_chunks = 128
+        self.length = self.num_chunks * self.chunk_size
+
+        self.checksum = hashlib.sha256()
+
+        # Create a file with some data in it
+        self.volume_file = tempfile.NamedTemporaryFile()
+        for i in xrange(0, self.num_chunks):
+            data = os.urandom(self.chunk_size)
+            self.checksum.update(data)
+            self.volume_file.write(data)
+
+        self.volume_file.seek(0)
+
+    def test_get_rbd_support(self):
+        service = CephBackupService(self.ctxt)
+
+        self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_LAYERING'))
+        self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_STRIPINGV2'))
+
+        oldformat, features = service._get_rbd_support()
+        self.assertTrue(oldformat)
+        self.assertEquals(features, 0)
+
+        service.rbd.RBD_FEATURE_LAYERING = 1
+
+        oldformat, features = service._get_rbd_support()
+        self.assertFalse(oldformat)
+        self.assertEquals(features, 1)
+
+        service.rbd.RBD_FEATURE_STRIPINGV2 = 2
+
+        oldformat, features = service._get_rbd_support()
+        self.assertFalse(oldformat)
+        self.assertEquals(features, 1 | 2)
+
+    def test_tranfer_data_from_rbd(self):
+        service = CephBackupService(self.ctxt)
+
+        with tempfile.NamedTemporaryFile() as test_file:
+            self.volume_file.seek(0)
+
+            def read_data(inst, offset, length):
+                return self.volume_file.read(self.length)
+
+            self.stubs.Set(service.rbd.Image, 'read', read_data)
+
+            service._transfer_data(service.rbd.Image(), test_file, 'foo',
+                                   self.length)
+
+            checksum = hashlib.sha256()
+            test_file.seek(0)
+            for c in xrange(0, self.num_chunks):
+                checksum.update(test_file.read(self.chunk_size))
+
+            # Ensure the files are equal
+            self.assertEquals(checksum.digest(), self.checksum.digest())
+
+    def test_tranfer_data_to_rbd(self):
+        service = CephBackupService(self.ctxt)
+
+        with tempfile.NamedTemporaryFile() as test_file:
+            checksum = hashlib.sha256()
+
+            def write_data(inst, data, offset):
+                checksum.update(data)
+                test_file.write(data)
+
+            self.stubs.Set(service.rbd.Image, 'write', write_data)
+
+            service._transfer_data(self.volume_file, service.rbd.Image(),
+                                   'foo', self.length, dest_is_rbd=True)
+
+            # Ensure the files are equal
+            self.assertEquals(checksum.digest(), self.checksum.digest())
+
+    def test_backup_volume_from_file(self):
+        service = CephBackupService(self.ctxt)
+
+        with tempfile.NamedTemporaryFile() as test_file:
+            checksum = hashlib.sha256()
+
+            def write_data(inst, data, offset):
+                checksum.update(data)
+                test_file.write(data)
+
+            self.stubs.Set(service.rbd.Image, 'write', write_data)
+
+            service._backup_volume_from_file('foo', self.length,
+                                             self.volume_file)
+
+            # Ensure the files are equal
+            self.assertEquals(checksum.digest(), self.checksum.digest())
+
+    def tearDown(self):
+        self.volume_file.close()
+        super(BackupCephTestCase, self).tearDown()
+
+    def test_backup_error1(self):
+        service = CephBackupService(self.ctxt)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+        self._create_volume_db_entry(self.vol_id, 0)
+        self.assertRaises(exception.InvalidParameterValue, service.backup,
+                          backup, self.volume_file)
+
+    def test_backup_error2(self):
+        service = CephBackupService(self.ctxt)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+        self._create_volume_db_entry(self.vol_id, 1)
+        self.assertRaises(exception.BackupVolumeInvalidType, service.backup,
+                          backup, None)
+
+    def test_backup_good(self):
+        service = CephBackupService(self.ctxt)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+        self._create_volume_db_entry(self.vol_id, 1)
+
+        with tempfile.NamedTemporaryFile() as test_file:
+            checksum = hashlib.sha256()
+
+            def write_data(inst, data, offset):
+                checksum.update(data)
+                test_file.write(data)
+
+            self.stubs.Set(service.rbd.Image, 'write', write_data)
+
+            service.backup(backup, self.volume_file)
+
+            # Ensure the files are equal
+            self.assertEquals(checksum.digest(), self.checksum.digest())
+
+    def test_restore(self):
+        service = CephBackupService(self.ctxt)
+        self._create_volume_db_entry(self.vol_id, 1)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+
+        with tempfile.NamedTemporaryFile() as test_file:
+            self.volume_file.seek(0)
+
+            def read_data(inst, offset, length):
+                return self.volume_file.read(self.length)
+
+            self.stubs.Set(service.rbd.Image, 'read', read_data)
+
+            service.restore(backup, self.vol_id, test_file)
+
+            checksum = hashlib.sha256()
+            test_file.seek(0)
+            for c in xrange(0, self.num_chunks):
+                checksum.update(test_file.read(self.chunk_size))
+
+            # Ensure the files are equal
+            self.assertEquals(checksum.digest(), self.checksum.digest())
+
+    def test_delete(self):
+        service = CephBackupService(self.ctxt)
+        self._create_volume_db_entry(self.vol_id, 1)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+
+        # Must be something mutable
+        remove_called = []
+
+        def remove(inst, ioctx, name):
+            remove_called.append(True)
+
+        self.stubs.Set(service.rbd.RBD, 'remove', remove)
+        service.delete(backup)
+        self.assertTrue(remove_called[0])
index 67e84c889de021aefea5535611800f090d431fcf..2eb23b51dc111d423ead4c53e6b5e4ed430ef6fc 100644 (file)
 #backup_compression_algorithm=zlib
 
 
+#
+# Options defined in cinder.backup.services.ceph
+#
+
+# The configration file to use for the backup cluster (string value)
+#backup_ceph_conf=/etc/ceph/ceph.conf
+
+# The Ceph user with permissions to access the backup pool (string value)
+#backup_ceph_user=cinder
+
+# The RADOS pool in which volume backups are stored (string value)
+#backup_ceph_pool=backups
+
+# The RBD stripe unit to use when creating a backup image (integer value)
+#backup_ceph_stripe_unit=0
+
+# The RBD stripe count to use when creating a backup image (integer value)
+#backup_ceph_stripe_count=0
+
+# The chunk size used to break up the data when transferring to Ceph object
+# store.
+#backup_ceph_chunk_size=134217728
+
 #
 # Options defined in cinder.db.api
 #