From: Edward Hope-Morley Date: Fri, 25 Oct 2013 17:57:55 +0000 (-0700) Subject: Use pipe between ceph backup diff export/import X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=d384d28e1c2c7c3ce0bd676fb1c2bdb8a2d98a03;p=openstack-build%2Fcinder-build.git Use pipe between ceph backup diff export/import We now use a piped transfer between the rbd export-diff and import-diff for incremental backups/restores as opposed to holding the entire diff in memory. Change-Id: I33476d9b3934781413af5cd2867a11d825a5d78e Fixes: bug 1244464 --- diff --git a/cinder/backup/drivers/ceph.py b/cinder/backup/drivers/ceph.py index 16ec821b9..94d5a401d 100644 --- a/cinder/backup/drivers/ceph.py +++ b/cinder/backup/drivers/ceph.py @@ -41,8 +41,10 @@ was deemed the safest action to take. It is therefore recommended to always restore to a new volume (default). """ +import fcntl import os import re +import subprocess import time import eventlet @@ -51,7 +53,6 @@ from oslo.config import cfg from cinder.backup.driver import BackupDriver from cinder import exception from cinder.openstack.common import log as logging -from cinder.openstack.common import processutils from cinder import units from cinder import utils import cinder.volume.drivers.rbd as rbd_driver @@ -410,6 +411,36 @@ class CephBackupDriver(BackupDriver): finally: src_rbd.close() + def _piped_execute(self, cmd1, cmd2): + """Pipe output of cmd1 into cmd2.""" + LOG.debug("piping cmd1='%s' into..." % (' '.join(cmd1))) + LOG.debug("cmd2='%s'" % (' '.join(cmd2))) + + try: + p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError as e: + LOG.error("pipe1 failed - %s " % unicode(e)) + raise + + # NOTE(dosaboy): ensure that the pipe is blocking. This is to work + # around the case where evenlet.green.subprocess is used which seems to + # use a non-blocking pipe. + flags = fcntl.fcntl(p1.stdout, fcntl.F_GETFL) & (~os.O_NONBLOCK) + fcntl.fcntl(p1.stdout, fcntl.F_SETFL, flags) + + try: + p2 = subprocess.Popen(cmd2, stdin=p1.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError as e: + LOG.error("pipe2 failed - %s " % unicode(e)) + raise + + p1.stdout.close() + stdout, stderr = p2.communicate() + return p2.returncode, stderr + def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool, src_user, src_conf, dest_user, dest_conf, src_snap=None, from_snap=None): @@ -430,29 +461,22 @@ class CephBackupDriver(BackupDriver): src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool) dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool) - cmd = ['rbd', 'export-diff'] + src_ceph_args + cmd1 = ['rbd', 'export-diff'] + src_ceph_args if from_snap is not None: - cmd.extend(['--from-snap', from_snap]) + cmd1.extend(['--from-snap', from_snap]) if src_snap: path = self._utf8("%s/%s@%s" % (src_pool, src_name, src_snap)) else: path = self._utf8("%s/%s" % (src_pool, src_name)) - cmd.extend([path, '-']) - try: - out, err = self._execute(*cmd) - except (processutils.ProcessExecutionError, - processutils.UnknownArgumentError) as exc: - msg = _("rbd export-diff failed - %s") % (str(exc)) - LOG.info(msg) - raise exception.BackupRBDOperationFailed(msg) + cmd1.extend([path, '-']) - cmd = ['rbd', 'import-diff'] + dest_ceph_args - cmd.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))]) - try: - out, err = self._execute(*cmd, process_input=out) - except (processutils.ProcessExecutionError, - processutils.UnknownArgumentError) as exc: - msg = _("rbd import-diff failed - %s") % (str(exc)) + cmd2 = ['rbd', 'import-diff'] + dest_ceph_args + cmd2.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))]) + + ret, stderr = self._piped_execute(cmd1, cmd2) + if ret: + msg = (_("rbd diff op failed - (ret=%(ret)s stderr=%(stderr)s)") % + ({'ret': ret, 'stderr': stderr})) LOG.info(msg) raise exception.BackupRBDOperationFailed(msg) diff --git a/cinder/tests/test_backup_ceph.py b/cinder/tests/test_backup_ceph.py index 43a7331b3..55dbe6dde 100644 --- a/cinder/tests/test_backup_ceph.py +++ b/cinder/tests/test_backup_ceph.py @@ -14,8 +14,10 @@ # under the License. """ Tests for Ceph backup service """ +import fcntl import hashlib import os +import subprocess import tempfile import time import uuid @@ -60,6 +62,31 @@ class BackupCephTestCase(test.TestCase): 'user_foo', 'conf_foo') return rbddriver.RBDImageIOWrapper(rbd_meta) + def _setup_mock_popen(self, inst, retval=None, p1hook=None, p2hook=None): + class stdout(object): + def close(self): + inst.called.append('stdout_close') + + class FakePopen(object): + + PASS = 0 + + def __init__(self, cmd, *args, **kwargs): + inst.called.append('popen_init') + self.stdout = stdout() + self.returncode = 0 + self.__class__.PASS += 1 + if self.__class__.PASS == 1 and p1hook: + p1hook() + elif self.__class__.PASS == 2 and p2hook: + p2hook() + + def communicate(self): + inst.called.append('communicate') + return retval + + self.stubs.Set(subprocess, 'Popen', FakePopen) + def setUp(self): super(BackupCephTestCase, self).setUp() self.ctxt = context.get_admin_context() @@ -101,6 +128,13 @@ class BackupCephTestCase(test.TestCase): self.stubs.Set(time, 'time', self.time_inc) self.stubs.Set(eventlet, 'sleep', lambda *args: None) + # Used to collect info on what was called during a test + self.called = [] + + # Do this to ensure that any test ending up in a subprocess fails if + # not properly mocked. + self.stubs.Set(subprocess, 'Popen', None) + def test_get_rbd_support(self): self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING')) self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2')) @@ -327,21 +361,30 @@ class BackupCephTestCase(test.TestCase): self.stubs.Set(self.service, '_try_delete_base_image', lambda *args, **kwargs: None) + self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0) + with tempfile.NamedTemporaryFile() as test_file: checksum = hashlib.sha256() - def write_data(inst, data, offset): + def write_data(): + self.volume_file.seek(0) + data = self.volume_file.read(self.length) + self.called.append('write') checksum.update(data) test_file.write(data) - def read_data(inst, offset, length): + def read_data(): + self.called.append('read') return self.volume_file.read(self.length) def rbd_list(inst, ioctx): + self.called.append('list') return [backup_name] - self.stubs.Set(self.service.rbd.Image, 'read', read_data) - self.stubs.Set(self.service.rbd.Image, 'write', write_data) + self._setup_mock_popen(self, ['out', 'err'], + p1hook=read_data, + p2hook=write_data) + self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) self.stubs.Set(self.service, '_discard_bytes', @@ -354,6 +397,10 @@ class BackupCephTestCase(test.TestCase): self.service.backup(backup, rbd_io) + self.assertEquals(self.called, ['list', 'popen_init', 'read', + 'popen_init', 'write', + 'stdout_close', 'communicate']) + # Ensure the files are equal self.assertEqual(checksum.digest(), self.checksum.digest()) @@ -463,15 +510,12 @@ class BackupCephTestCase(test.TestCase): self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps) self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) - # Must be something mutable - remove_called = [] - def remove(inst, ioctx, name): - remove_called.append(True) + self.called.append('remove') self.stubs.Set(self.service.rbd.RBD, 'remove', remove) self.service.delete(backup) - self.assertTrue(remove_called[0]) + self.assertEquals(self.called, ['remove']) def test_try_delete_base_image(self): # don't create volume db entry since it should not be required @@ -486,18 +530,15 @@ class BackupCephTestCase(test.TestCase): self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) - # Must be something mutable - remove_called = [] - self.stubs.Set(self.service, 'get_backup_snaps', lambda *args, **kwargs: None) def remove(inst, ioctx, name): - remove_called.append(True) + self.called.append('remove') self.stubs.Set(self.service.rbd.RBD, 'remove', remove) self.service.delete(backup) - self.assertTrue(remove_called[0]) + self.assertEquals(self.called, ['remove']) def test_try_delete_base_image_busy(self): """This should induce retries then raise rbd.ImageBusy.""" @@ -513,9 +554,6 @@ class BackupCephTestCase(test.TestCase): self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) - # Must be something mutable - remove_called = [] - self.stubs.Set(self.service, 'get_backup_snaps', lambda *args, **kwargs: None) @@ -616,6 +654,13 @@ class BackupCephTestCase(test.TestCase): self.assertEqual(resp, not_allowed) self._set_service_stub('_file_is_rbd', True) + def test_piped_execute(self): + self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0) + self._setup_mock_popen(self, ['out', 'err']) + self.service._piped_execute(['foo'], ['bar']) + self.assertEquals(self.called, ['popen_init', 'popen_init', + 'stdout_close', 'communicate']) + def tearDown(self): self.volume_file.close() self.stubs.UnsetAll()