]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Use pipe between ceph backup diff export/import
authorEdward Hope-Morley <edward.hope-morley@canonical.com>
Fri, 25 Oct 2013 17:57:55 +0000 (10:57 -0700)
committerEdward Hope-Morley <edward.hope-morley@canonical.com>
Tue, 29 Oct 2013 19:14:32 +0000 (19:14 +0000)
We now use a piped transfer between the rbd export-diff
and import-diff for incremental backups/restores as
opposed to holding the entire diff in memory.

Change-Id: I33476d9b3934781413af5cd2867a11d825a5d78e
Fixes: bug 1244464
cinder/backup/drivers/ceph.py
cinder/tests/test_backup_ceph.py

index 16ec821b9554c650e73519e381ecab26f9dd5b8f..94d5a401db174a21d0905eb3c2c9cc4bf97f3a11 100644 (file)
@@ -41,8 +41,10 @@ was deemed the safest action to take. It is therefore recommended to always
 restore to a new volume (default).
 """
 
+import fcntl
 import os
 import re
+import subprocess
 import time
 
 import eventlet
@@ -51,7 +53,6 @@ from oslo.config import cfg
 from cinder.backup.driver import BackupDriver
 from cinder import exception
 from cinder.openstack.common import log as logging
-from cinder.openstack.common import processutils
 from cinder import units
 from cinder import utils
 import cinder.volume.drivers.rbd as rbd_driver
@@ -410,6 +411,36 @@ class CephBackupDriver(BackupDriver):
                 finally:
                     src_rbd.close()
 
+    def _piped_execute(self, cmd1, cmd2):
+        """Pipe output of cmd1 into cmd2."""
+        LOG.debug("piping cmd1='%s' into..." % (' '.join(cmd1)))
+        LOG.debug("cmd2='%s'" % (' '.join(cmd2)))
+
+        try:
+            p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE,
+                                  stderr=subprocess.PIPE)
+        except OSError as e:
+            LOG.error("pipe1 failed - %s " % unicode(e))
+            raise
+
+        # NOTE(dosaboy): ensure that the pipe is blocking. This is to work
+        # around the case where evenlet.green.subprocess is used which seems to
+        # use a non-blocking pipe.
+        flags = fcntl.fcntl(p1.stdout, fcntl.F_GETFL) & (~os.O_NONBLOCK)
+        fcntl.fcntl(p1.stdout, fcntl.F_SETFL, flags)
+
+        try:
+            p2 = subprocess.Popen(cmd2, stdin=p1.stdout,
+                                  stdout=subprocess.PIPE,
+                                  stderr=subprocess.PIPE)
+        except OSError as e:
+            LOG.error("pipe2 failed - %s " % unicode(e))
+            raise
+
+        p1.stdout.close()
+        stdout, stderr = p2.communicate()
+        return p2.returncode, stderr
+
     def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
                            src_user, src_conf, dest_user, dest_conf,
                            src_snap=None, from_snap=None):
@@ -430,29 +461,22 @@ class CephBackupDriver(BackupDriver):
         src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
         dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)
 
-        cmd = ['rbd', 'export-diff'] + src_ceph_args
+        cmd1 = ['rbd', 'export-diff'] + src_ceph_args
         if from_snap is not None:
-            cmd.extend(['--from-snap', from_snap])
+            cmd1.extend(['--from-snap', from_snap])
         if src_snap:
             path = self._utf8("%s/%s@%s" % (src_pool, src_name, src_snap))
         else:
             path = self._utf8("%s/%s" % (src_pool, src_name))
-        cmd.extend([path, '-'])
-        try:
-            out, err = self._execute(*cmd)
-        except (processutils.ProcessExecutionError,
-                processutils.UnknownArgumentError) as exc:
-            msg = _("rbd export-diff failed - %s") % (str(exc))
-            LOG.info(msg)
-            raise exception.BackupRBDOperationFailed(msg)
+        cmd1.extend([path, '-'])
 
-        cmd = ['rbd', 'import-diff'] + dest_ceph_args
-        cmd.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])
-        try:
-            out, err = self._execute(*cmd, process_input=out)
-        except (processutils.ProcessExecutionError,
-                processutils.UnknownArgumentError) as exc:
-            msg = _("rbd import-diff failed - %s") % (str(exc))
+        cmd2 = ['rbd', 'import-diff'] + dest_ceph_args
+        cmd2.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])
+
+        ret, stderr = self._piped_execute(cmd1, cmd2)
+        if ret:
+            msg = (_("rbd diff op failed - (ret=%(ret)s stderr=%(stderr)s)") %
+                   ({'ret': ret, 'stderr': stderr}))
             LOG.info(msg)
             raise exception.BackupRBDOperationFailed(msg)
 
index 43a7331b36c6e437c84bdc6b5a4851d1afe1ee68..55dbe6dde6529012560edf5e36e4f810650d7dbb 100644 (file)
 #    under the License.
 """ Tests for Ceph backup service """
 
+import fcntl
 import hashlib
 import os
+import subprocess
 import tempfile
 import time
 import uuid
@@ -60,6 +62,31 @@ class BackupCephTestCase(test.TestCase):
                                               'user_foo', 'conf_foo')
         return rbddriver.RBDImageIOWrapper(rbd_meta)
 
+    def _setup_mock_popen(self, inst, retval=None, p1hook=None, p2hook=None):
+        class stdout(object):
+            def close(self):
+                inst.called.append('stdout_close')
+
+        class FakePopen(object):
+
+            PASS = 0
+
+            def __init__(self, cmd, *args, **kwargs):
+                inst.called.append('popen_init')
+                self.stdout = stdout()
+                self.returncode = 0
+                self.__class__.PASS += 1
+                if self.__class__.PASS == 1 and p1hook:
+                    p1hook()
+                elif self.__class__.PASS == 2 and p2hook:
+                    p2hook()
+
+            def communicate(self):
+                inst.called.append('communicate')
+                return retval
+
+        self.stubs.Set(subprocess, 'Popen', FakePopen)
+
     def setUp(self):
         super(BackupCephTestCase, self).setUp()
         self.ctxt = context.get_admin_context()
@@ -101,6 +128,13 @@ class BackupCephTestCase(test.TestCase):
         self.stubs.Set(time, 'time', self.time_inc)
         self.stubs.Set(eventlet, 'sleep', lambda *args: None)
 
+        # Used to collect info on what was called during a test
+        self.called = []
+
+        # Do this to ensure that any test ending up in a subprocess fails if
+        # not properly mocked.
+        self.stubs.Set(subprocess, 'Popen', None)
+
     def test_get_rbd_support(self):
         self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING'))
         self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2'))
@@ -327,21 +361,30 @@ class BackupCephTestCase(test.TestCase):
         self.stubs.Set(self.service, '_try_delete_base_image',
                        lambda *args, **kwargs: None)
 
+        self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
+
         with tempfile.NamedTemporaryFile() as test_file:
             checksum = hashlib.sha256()
 
-            def write_data(inst, data, offset):
+            def write_data():
+                self.volume_file.seek(0)
+                data = self.volume_file.read(self.length)
+                self.called.append('write')
                 checksum.update(data)
                 test_file.write(data)
 
-            def read_data(inst, offset, length):
+            def read_data():
+                self.called.append('read')
                 return self.volume_file.read(self.length)
 
             def rbd_list(inst, ioctx):
+                self.called.append('list')
                 return [backup_name]
 
-            self.stubs.Set(self.service.rbd.Image, 'read', read_data)
-            self.stubs.Set(self.service.rbd.Image, 'write', write_data)
+            self._setup_mock_popen(self, ['out', 'err'],
+                                   p1hook=read_data,
+                                   p2hook=write_data)
+
             self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
 
             self.stubs.Set(self.service, '_discard_bytes',
@@ -354,6 +397,10 @@ class BackupCephTestCase(test.TestCase):
 
             self.service.backup(backup, rbd_io)
 
+            self.assertEquals(self.called, ['list', 'popen_init', 'read',
+                                            'popen_init', 'write',
+                                            'stdout_close', 'communicate'])
+
             # Ensure the files are equal
             self.assertEqual(checksum.digest(), self.checksum.digest())
 
@@ -463,15 +510,12 @@ class BackupCephTestCase(test.TestCase):
         self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
         self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
 
-        # Must be something mutable
-        remove_called = []
-
         def remove(inst, ioctx, name):
-            remove_called.append(True)
+            self.called.append('remove')
 
         self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
         self.service.delete(backup)
-        self.assertTrue(remove_called[0])
+        self.assertEquals(self.called, ['remove'])
 
     def test_try_delete_base_image(self):
         # don't create volume db entry since it should not be required
@@ -486,18 +530,15 @@ class BackupCephTestCase(test.TestCase):
 
         self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
 
-        # Must be something mutable
-        remove_called = []
-
         self.stubs.Set(self.service, 'get_backup_snaps',
                        lambda *args, **kwargs: None)
 
         def remove(inst, ioctx, name):
-            remove_called.append(True)
+            self.called.append('remove')
 
         self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
         self.service.delete(backup)
-        self.assertTrue(remove_called[0])
+        self.assertEquals(self.called, ['remove'])
 
     def test_try_delete_base_image_busy(self):
         """This should induce retries then raise rbd.ImageBusy."""
@@ -513,9 +554,6 @@ class BackupCephTestCase(test.TestCase):
 
         self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
 
-        # Must be something mutable
-        remove_called = []
-
         self.stubs.Set(self.service, 'get_backup_snaps',
                        lambda *args, **kwargs: None)
 
@@ -616,6 +654,13 @@ class BackupCephTestCase(test.TestCase):
         self.assertEqual(resp, not_allowed)
         self._set_service_stub('_file_is_rbd', True)
 
+    def test_piped_execute(self):
+        self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
+        self._setup_mock_popen(self, ['out', 'err'])
+        self.service._piped_execute(['foo'], ['bar'])
+        self.assertEquals(self.called, ['popen_init', 'popen_init',
+                                        'stdout_close', 'communicate'])
+
     def tearDown(self):
         self.volume_file.close()
         self.stubs.UnsetAll()