From: Edward Hope-Morley Date: Tue, 25 Jun 2013 09:11:11 +0000 (+0100) Subject: Added Cinder volume backup to Ceph support X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=e96b3e5ffcd8d01c461bc915e3eaebecab1516b9;p=openstack-build%2Fcinder-build.git Added Cinder volume backup to Ceph support Added new Ceph backup service to allow backup of Cinder volumes to a Ceph object store. This driver is compatible with the existing backup interface provided by the Swift backup service. Implements: blueprint cinder-backup-to-ceph Change-Id: I299f033347cb263e7169c4a4efb758d19e753f46 --- diff --git a/cinder/backup/api.py b/cinder/backup/api.py index 325f86231..644de210c 100644 --- a/cinder/backup/api.py +++ b/cinder/backup/api.py @@ -133,7 +133,7 @@ class API(base.Base): # it is large enough for the backup if volume_id is None: name = 'restore_backup_%s' % backup_id - description = 'auto-created_from_restore_from_swift' + description = 'auto-created_from_restore_from_backup' LOG.audit(_("Creating volume of %(size)s GB for restore of " "backup %(backup_id)s"), diff --git a/cinder/backup/manager.py b/cinder/backup/manager.py index 08b9d4942..471445467 100755 --- a/cinder/backup/manager.py +++ b/cinder/backup/manager.py @@ -16,8 +16,9 @@ """ Backup manager manages volume backups. -Volume Backups are full copies of persistent volumes stored in Swift object -storage. They are usable without the original object being available. A +Volume Backups are full copies of persistent volumes stored in a backup +store e.g. an object store or any other backup store if and when support is +added. They are usable without the original object being available. A volume backup can be restored to the original volume it was created from or any other available volume with a minimum size of the original volume. Volume backups can be created, restored, deleted and listed. diff --git a/cinder/backup/services/ceph.py b/cinder/backup/services/ceph.py new file mode 100644 index 000000000..9ff65c115 --- /dev/null +++ b/cinder/backup/services/ceph.py @@ -0,0 +1,275 @@ +# Copyright 2013 Canonical Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Ceph Backup Service Implementation""" + +from cinder.db import base +from cinder import exception +from cinder.openstack.common import log as logging +from cinder import units +import cinder.volume.drivers.rbd as rbddriver +import eventlet +import os +from oslo.config import cfg +import time + +try: + import rados + import rbd +except ImportError: + rados = None + rbd = None + +LOG = logging.getLogger(__name__) + +service_opts = [ + cfg.StrOpt('backup_ceph_conf', default='/etc/ceph/ceph.conf', + help='Ceph config file to use.'), + cfg.StrOpt('backup_ceph_user', default='cinder', + help='the Ceph user to connect with'), + cfg.StrOpt('backup_ceph_chunk_size', default=(units.MiB * 128), + help='the chunk size in bytes that a backup will be broken ' + 'into before transfer to backup store'), + cfg.StrOpt('backup_ceph_pool', default='backups', + help='the Ceph pool to backup to'), + cfg.StrOpt('backup_ceph_stripe_unit', default=0, + help='RBD stripe unit to use when creating a backup image'), + cfg.StrOpt('backup_ceph_stripe_count', default=0, + help='RBD stripe count to use when creating a backup image') +] + +CONF = cfg.CONF +CONF.register_opts(service_opts) + + +class CephBackupService(base.Base): + """Backup up Cinder volumes to Ceph Object Store""" + + def __init__(self, context, db_driver=None): + super(CephBackupService, self).__init__(db_driver) + self.rbd = rbd + self.rados = rados + self.context = context + self.chunk_size = CONF.backup_ceph_chunk_size + if self._supports_stripingv2(): + self.rbd_stripe_unit = int(CONF.backup_ceph_stripe_unit) + self.rbd_stripe_count = int(CONF.backup_ceph_stripe_count) + else: + LOG.info("rbd striping not supported - ignoring conf settings " + "for rbd striping") + self.rbd_stripe_count = 0 + self.rbd_stripe_unit = 0 + + self._ceph_user = str(CONF.backup_ceph_user) + self._ceph_pool = str(CONF.backup_ceph_pool) + self._ceph_conf = str(CONF.backup_ceph_conf) + + def _supports_layering(self): + """ + Determine whether copy-on-write is supported by our version of librbd + """ + return hasattr(self.rbd, 'RBD_FEATURE_LAYERING') + + def _supports_stripingv2(self): + """ + Determine whether striping is supported by our version of librbd + """ + return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2') + + def _get_rbd_support(self): + old_format = True + features = 0 + if self._supports_layering(): + old_format = False + features |= self.rbd.RBD_FEATURE_LAYERING + if self._supports_stripingv2(): + old_format = False + features |= self.rbd.RBD_FEATURE_STRIPINGV2 + + return (old_format, features) + + def _connect_to_rados(self, pool=None): + """Establish connection to the Ceph cluster""" + client = self.rados.Rados(rados_id=self._ceph_user, + conffile=self._ceph_conf) + try: + client.connect() + pool_to_open = str(pool or self._ceph_pool) + ioctx = client.open_ioctx(pool_to_open) + return client, ioctx + except self.rados.Error: + # shutdown cannot raise an exception + client.shutdown() + raise + + def _disconnect_from_rados(self, client, ioctx): + """Terminate connection with the Ceph cluster""" + # closing an ioctx cannot raise an exception + ioctx.close() + client.shutdown() + + def _get_backup_rbd_name(self, vol_name, backup_id): + """Make sure we use a consistent format for backup names""" + # ensure no unicode + return str("%s.backup.%s" % (vol_name, backup_id)) + + def _transfer_data(self, src, dest, dest_name, length, dest_is_rbd=False): + """ + Transfer data between file and rbd. If destination is rbd, source is + assumed to be file, otherwise source is assumed to be rbd. + """ + chunks = int(length / self.chunk_size) + LOG.debug("transferring %s chunks of %s bytes to '%s'" % + (chunks, self.chunk_size, dest_name)) + for chunk in xrange(0, chunks): + offset = chunk * self.chunk_size + before = time.time() + + if dest_is_rbd: + dest.write(src.read(self.chunk_size), offset) + # note(dosaboy): librbd writes are synchronous so flush() will + # have not effect. Also, flush only supported in more recent + # versions of librbd. + else: + dest.write(src.read(offset, self.chunk_size)) + dest.flush() + + delta = (time.time() - before) + rate = (self.chunk_size / delta) / 1024 + LOG.debug("transferred chunk %s of %s (%dK/s)" % + (chunk, chunks, rate)) + + # yield to any other pending backups + eventlet.sleep(0) + + rem = int(length % self.chunk_size) + if rem: + LOG.debug("transferring remaining %s bytes" % (rem)) + offset = (length - rem) + if dest_is_rbd: + dest.write(src.read(rem), offset) + # note(dosaboy): librbd writes are synchronous so flush() will + # have not effect. Also, flush only supported in more recent + # versions of librbd. + else: + dest.write(src.read(offset, rem)) + dest.flush() + + # yield to any other pending backups + eventlet.sleep(0) + + def _backup_volume_from_file(self, backup_name, backup_size, volume_file): + """Backup a volume from file stream""" + LOG.debug("performing backup from file") + + old_format, features = self._get_rbd_support() + + with rbddriver.RADOSClient(self, self._ceph_pool) as client: + self.rbd.RBD().create(ioctx=client.ioctx, + name=backup_name, + size=backup_size, + old_format=old_format, + features=features, + stripe_unit=self.rbd_stripe_unit, + stripe_count=self.rbd_stripe_count) + + dest_rbd = self.rbd.Image(client.ioctx, backup_name) + try: + self._transfer_data(volume_file, dest_rbd, backup_name, + backup_size, dest_is_rbd=True) + finally: + dest_rbd.close() + + def backup(self, backup, volume_file): + """Backup the given volume to Ceph object store""" + backup_id = backup['id'] + volume = self.db.volume_get(self.context, backup['volume_id']) + backup_name = self._get_backup_rbd_name(volume['name'], backup_id) + + LOG.debug("Starting backup of volume='%s' to rbd='%s'" % + (volume['name'], backup_name)) + + if int(volume['size']) == 0: + raise exception.InvalidParameterValue("need non-zero volume size") + else: + backup_size = int(volume['size']) * units.GiB + + if volume_file: + self._backup_volume_from_file(backup_name, backup_size, + volume_file) + else: + errmsg = ("No volume_file was provided so I cannot do requested " + "backup (id=%s)" % (backup_id)) + raise exception.BackupVolumeInvalidType(errmsg) + + self.db.backup_update(self.context, backup['id'], + {'container': self._ceph_pool}) + + LOG.debug(_("backup '%s' finished.") % (backup_id)) + + def restore(self, backup, volume_id, volume_file): + """Restore the given volume backup from Ceph object store""" + volume_id = backup['volume_id'] + volume = self.db.volume_get(self.context, volume_id) + backup_name = self._get_backup_rbd_name(volume['name'], backup['id']) + + LOG.debug('starting backup restore from Ceph backup=%s ' + 'to volume=%s' % (backup['id'], volume['name'])) + + # Ensure we are at the beginning of the volume + volume_file.seek(0) + + backup_size = int(volume['size']) * units.GiB + + with rbddriver.RADOSClient(self, self._ceph_pool) as client: + src_rbd = self.rbd.Image(client.ioctx, backup_name) + try: + self._transfer_data(src_rbd, volume_file, volume['name'], + backup_size) + finally: + src_rbd.close() + + # Be tolerant to IO implementations that do not support fileno() + try: + fileno = volume_file.fileno() + except IOError: + LOG.info("volume_file does not support fileno() so skipping " + "fsync()") + else: + os.fsync(fileno) + + LOG.debug('restore %s to %s finished.' % (backup['id'], volume_id)) + + def delete(self, backup): + """Delete the given backup from Ceph object store""" + backup_id = backup['id'] + volume_id = backup['volume_id'] + volume = self.db.volume_get(self.context, volume_id) + backup_name = self._get_backup_rbd_name(volume['name'], backup_id) + + LOG.debug('delete started for backup=%s', backup['id']) + + try: + with rbddriver.RADOSClient(self) as client: + self.rbd.RBD().remove(client.ioctx, backup_name) + except self.rbd.ImageNotFound: + LOG.warning("rbd image '%s' not found but continuing anyway so " + "that db entry can be removed" % (backup_name)) + + LOG.debug(_("delete '%s' finished") % (backup_id)) + + +def get_backup_service(context): + return CephBackupService(context) diff --git a/cinder/exception.py b/cinder/exception.py index ccfc95c19..ac0cc157a 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -564,6 +564,10 @@ class ImageCopyFailure(Invalid): message = _("Failed to copy image to volume: %(reason)s") +class BackupVolumeInvalidType(Invalid): + message = _("Backup volume %(volume_id)s type not recognised.") + + class BackupNotFound(NotFound): message = _("Backup %(backup_id)s could not be found.") diff --git a/cinder/tests/backup/fake_rados.py b/cinder/tests/backup/fake_rados.py new file mode 100644 index 000000000..2169cc05f --- /dev/null +++ b/cinder/tests/backup/fake_rados.py @@ -0,0 +1,77 @@ +# Copyright 2013 Canonical Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class mock_rados(object): + + class mock_ioctx(object): + def __init__(self, *args, **kwargs): + pass + + def close(self, *args, **kwargs): + pass + + class Rados(object): + + def __init__(self, *args, **kwargs): + pass + + def connect(self, *args, **kwargs): + pass + + def open_ioctx(self, *args, **kwargs): + return mock_rados.mock_ioctx() + + def shutdown(self, *args, **kwargs): + pass + + class Error(): + def __init__(self, *args, **kwargs): + pass + + +class mock_rbd(object): + + class Image(object): + + def __init__(self, *args, **kwargs): + pass + + def read(self, *args, **kwargs): + pass + + def write(self, *args, **kwargs): + pass + + def resize(self, *args, **kwargs): + pass + + def close(self, *args, **kwargs): + pass + + class RBD(object): + + def __init__(self, *args, **kwargs): + pass + + def create(self, *args, **kwargs): + pass + + def remove(self, *args, **kwargs): + pass + + class ImageNotFound(Exception): + def __init__(self, *args, **kwargs): + pass diff --git a/cinder/tests/test_backup_ceph.py b/cinder/tests/test_backup_ceph.py new file mode 100644 index 000000000..b65e474f0 --- /dev/null +++ b/cinder/tests/test_backup_ceph.py @@ -0,0 +1,228 @@ +# Copyright 2013 Canonical Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" Tests for Ceph backup service """ + +import hashlib +import os +import tempfile +import uuid + +from cinder.backup.services.ceph import CephBackupService +from cinder.tests.backup.fake_rados import mock_rados +from cinder.tests.backup.fake_rados import mock_rbd + +from cinder.backup.services import ceph +from cinder import context +from cinder import db +from cinder import exception +from cinder.openstack.common import log as logging +from cinder import test + +LOG = logging.getLogger(__name__) + + +class BackupCephTestCase(test.TestCase): + """Test Case for backup to Ceph object store""" + + def _create_volume_db_entry(self, id, size): + vol = {'id': id, 'size': size, 'status': 'available'} + return db.volume_create(self.ctxt, vol)['id'] + + def _create_backup_db_entry(self, backupid, volid, size): + backup = {'id': backupid, 'size': size, 'volume_id': volid} + return db.backup_create(self.ctxt, backup)['id'] + + def setUp(self): + super(BackupCephTestCase, self).setUp() + self.ctxt = context.get_admin_context() + + self.vol_id = str(uuid.uuid4()) + self.backup_id = str(uuid.uuid4()) + + # Setup librbd stubs + self.stubs.Set(ceph, 'rados', mock_rados) + self.stubs.Set(ceph, 'rbd', mock_rbd) + + self._create_backup_db_entry(self.backup_id, self.vol_id, 1) + + self.chunk_size = 1024 + self.num_chunks = 128 + self.length = self.num_chunks * self.chunk_size + + self.checksum = hashlib.sha256() + + # Create a file with some data in it + self.volume_file = tempfile.NamedTemporaryFile() + for i in xrange(0, self.num_chunks): + data = os.urandom(self.chunk_size) + self.checksum.update(data) + self.volume_file.write(data) + + self.volume_file.seek(0) + + def test_get_rbd_support(self): + service = CephBackupService(self.ctxt) + + self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_LAYERING')) + self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_STRIPINGV2')) + + oldformat, features = service._get_rbd_support() + self.assertTrue(oldformat) + self.assertEquals(features, 0) + + service.rbd.RBD_FEATURE_LAYERING = 1 + + oldformat, features = service._get_rbd_support() + self.assertFalse(oldformat) + self.assertEquals(features, 1) + + service.rbd.RBD_FEATURE_STRIPINGV2 = 2 + + oldformat, features = service._get_rbd_support() + self.assertFalse(oldformat) + self.assertEquals(features, 1 | 2) + + def test_tranfer_data_from_rbd(self): + service = CephBackupService(self.ctxt) + + with tempfile.NamedTemporaryFile() as test_file: + self.volume_file.seek(0) + + def read_data(inst, offset, length): + return self.volume_file.read(self.length) + + self.stubs.Set(service.rbd.Image, 'read', read_data) + + service._transfer_data(service.rbd.Image(), test_file, 'foo', + self.length) + + checksum = hashlib.sha256() + test_file.seek(0) + for c in xrange(0, self.num_chunks): + checksum.update(test_file.read(self.chunk_size)) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def test_tranfer_data_to_rbd(self): + service = CephBackupService(self.ctxt) + + with tempfile.NamedTemporaryFile() as test_file: + checksum = hashlib.sha256() + + def write_data(inst, data, offset): + checksum.update(data) + test_file.write(data) + + self.stubs.Set(service.rbd.Image, 'write', write_data) + + service._transfer_data(self.volume_file, service.rbd.Image(), + 'foo', self.length, dest_is_rbd=True) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def test_backup_volume_from_file(self): + service = CephBackupService(self.ctxt) + + with tempfile.NamedTemporaryFile() as test_file: + checksum = hashlib.sha256() + + def write_data(inst, data, offset): + checksum.update(data) + test_file.write(data) + + self.stubs.Set(service.rbd.Image, 'write', write_data) + + service._backup_volume_from_file('foo', self.length, + self.volume_file) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def tearDown(self): + self.volume_file.close() + super(BackupCephTestCase, self).tearDown() + + def test_backup_error1(self): + service = CephBackupService(self.ctxt) + backup = db.backup_get(self.ctxt, self.backup_id) + self._create_volume_db_entry(self.vol_id, 0) + self.assertRaises(exception.InvalidParameterValue, service.backup, + backup, self.volume_file) + + def test_backup_error2(self): + service = CephBackupService(self.ctxt) + backup = db.backup_get(self.ctxt, self.backup_id) + self._create_volume_db_entry(self.vol_id, 1) + self.assertRaises(exception.BackupVolumeInvalidType, service.backup, + backup, None) + + def test_backup_good(self): + service = CephBackupService(self.ctxt) + backup = db.backup_get(self.ctxt, self.backup_id) + self._create_volume_db_entry(self.vol_id, 1) + + with tempfile.NamedTemporaryFile() as test_file: + checksum = hashlib.sha256() + + def write_data(inst, data, offset): + checksum.update(data) + test_file.write(data) + + self.stubs.Set(service.rbd.Image, 'write', write_data) + + service.backup(backup, self.volume_file) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def test_restore(self): + service = CephBackupService(self.ctxt) + self._create_volume_db_entry(self.vol_id, 1) + backup = db.backup_get(self.ctxt, self.backup_id) + + with tempfile.NamedTemporaryFile() as test_file: + self.volume_file.seek(0) + + def read_data(inst, offset, length): + return self.volume_file.read(self.length) + + self.stubs.Set(service.rbd.Image, 'read', read_data) + + service.restore(backup, self.vol_id, test_file) + + checksum = hashlib.sha256() + test_file.seek(0) + for c in xrange(0, self.num_chunks): + checksum.update(test_file.read(self.chunk_size)) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def test_delete(self): + service = CephBackupService(self.ctxt) + self._create_volume_db_entry(self.vol_id, 1) + backup = db.backup_get(self.ctxt, self.backup_id) + + # Must be something mutable + remove_called = [] + + def remove(inst, ioctx, name): + remove_called.append(True) + + self.stubs.Set(service.rbd.RBD, 'remove', remove) + service.delete(backup) + self.assertTrue(remove_called[0]) diff --git a/etc/cinder/cinder.conf.sample b/etc/cinder/cinder.conf.sample index 67e84c889..2eb23b51d 100644 --- a/etc/cinder/cinder.conf.sample +++ b/etc/cinder/cinder.conf.sample @@ -362,6 +362,29 @@ #backup_compression_algorithm=zlib +# +# Options defined in cinder.backup.services.ceph +# + +# The configration file to use for the backup cluster (string value) +#backup_ceph_conf=/etc/ceph/ceph.conf + +# The Ceph user with permissions to access the backup pool (string value) +#backup_ceph_user=cinder + +# The RADOS pool in which volume backups are stored (string value) +#backup_ceph_pool=backups + +# The RBD stripe unit to use when creating a backup image (integer value) +#backup_ceph_stripe_unit=0 + +# The RBD stripe count to use when creating a backup image (integer value) +#backup_ceph_stripe_count=0 + +# The chunk size used to break up the data when transferring to Ceph object +# store. +#backup_ceph_chunk_size=134217728 + # # Options defined in cinder.db.api #