restore to a new volume (default).
"""
+import fcntl
import os
import re
+import subprocess
import time
import eventlet
from cinder.backup.driver import BackupDriver
from cinder import exception
from cinder.openstack.common import log as logging
-from cinder.openstack.common import processutils
from cinder import units
from cinder import utils
import cinder.volume.drivers.rbd as rbd_driver
finally:
src_rbd.close()
+ def _piped_execute(self, cmd1, cmd2):
+ """Pipe output of cmd1 into cmd2."""
+ LOG.debug("piping cmd1='%s' into..." % (' '.join(cmd1)))
+ LOG.debug("cmd2='%s'" % (' '.join(cmd2)))
+
+ try:
+ p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ except OSError as e:
+ LOG.error("pipe1 failed - %s " % unicode(e))
+ raise
+
+ # NOTE(dosaboy): ensure that the pipe is blocking. This is to work
+ # around the case where evenlet.green.subprocess is used which seems to
+ # use a non-blocking pipe.
+ flags = fcntl.fcntl(p1.stdout, fcntl.F_GETFL) & (~os.O_NONBLOCK)
+ fcntl.fcntl(p1.stdout, fcntl.F_SETFL, flags)
+
+ try:
+ p2 = subprocess.Popen(cmd2, stdin=p1.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ except OSError as e:
+ LOG.error("pipe2 failed - %s " % unicode(e))
+ raise
+
+ p1.stdout.close()
+ stdout, stderr = p2.communicate()
+ return p2.returncode, stderr
+
def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
src_user, src_conf, dest_user, dest_conf,
src_snap=None, from_snap=None):
src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)
- cmd = ['rbd', 'export-diff'] + src_ceph_args
+ cmd1 = ['rbd', 'export-diff'] + src_ceph_args
if from_snap is not None:
- cmd.extend(['--from-snap', from_snap])
+ cmd1.extend(['--from-snap', from_snap])
if src_snap:
path = self._utf8("%s/%s@%s" % (src_pool, src_name, src_snap))
else:
path = self._utf8("%s/%s" % (src_pool, src_name))
- cmd.extend([path, '-'])
- try:
- out, err = self._execute(*cmd)
- except (processutils.ProcessExecutionError,
- processutils.UnknownArgumentError) as exc:
- msg = _("rbd export-diff failed - %s") % (str(exc))
- LOG.info(msg)
- raise exception.BackupRBDOperationFailed(msg)
+ cmd1.extend([path, '-'])
- cmd = ['rbd', 'import-diff'] + dest_ceph_args
- cmd.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])
- try:
- out, err = self._execute(*cmd, process_input=out)
- except (processutils.ProcessExecutionError,
- processutils.UnknownArgumentError) as exc:
- msg = _("rbd import-diff failed - %s") % (str(exc))
+ cmd2 = ['rbd', 'import-diff'] + dest_ceph_args
+ cmd2.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])
+
+ ret, stderr = self._piped_execute(cmd1, cmd2)
+ if ret:
+ msg = (_("rbd diff op failed - (ret=%(ret)s stderr=%(stderr)s)") %
+ ({'ret': ret, 'stderr': stderr}))
LOG.info(msg)
raise exception.BackupRBDOperationFailed(msg)
# under the License.
""" Tests for Ceph backup service """
+import fcntl
import hashlib
import os
+import subprocess
import tempfile
import time
import uuid
'user_foo', 'conf_foo')
return rbddriver.RBDImageIOWrapper(rbd_meta)
+ def _setup_mock_popen(self, inst, retval=None, p1hook=None, p2hook=None):
+ class stdout(object):
+ def close(self):
+ inst.called.append('stdout_close')
+
+ class FakePopen(object):
+
+ PASS = 0
+
+ def __init__(self, cmd, *args, **kwargs):
+ inst.called.append('popen_init')
+ self.stdout = stdout()
+ self.returncode = 0
+ self.__class__.PASS += 1
+ if self.__class__.PASS == 1 and p1hook:
+ p1hook()
+ elif self.__class__.PASS == 2 and p2hook:
+ p2hook()
+
+ def communicate(self):
+ inst.called.append('communicate')
+ return retval
+
+ self.stubs.Set(subprocess, 'Popen', FakePopen)
+
def setUp(self):
super(BackupCephTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.stubs.Set(time, 'time', self.time_inc)
self.stubs.Set(eventlet, 'sleep', lambda *args: None)
+ # Used to collect info on what was called during a test
+ self.called = []
+
+ # Do this to ensure that any test ending up in a subprocess fails if
+ # not properly mocked.
+ self.stubs.Set(subprocess, 'Popen', None)
+
def test_get_rbd_support(self):
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING'))
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2'))
self.stubs.Set(self.service, '_try_delete_base_image',
lambda *args, **kwargs: None)
+ self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
+
with tempfile.NamedTemporaryFile() as test_file:
checksum = hashlib.sha256()
- def write_data(inst, data, offset):
+ def write_data():
+ self.volume_file.seek(0)
+ data = self.volume_file.read(self.length)
+ self.called.append('write')
checksum.update(data)
test_file.write(data)
- def read_data(inst, offset, length):
+ def read_data():
+ self.called.append('read')
return self.volume_file.read(self.length)
def rbd_list(inst, ioctx):
+ self.called.append('list')
return [backup_name]
- self.stubs.Set(self.service.rbd.Image, 'read', read_data)
- self.stubs.Set(self.service.rbd.Image, 'write', write_data)
+ self._setup_mock_popen(self, ['out', 'err'],
+ p1hook=read_data,
+ p2hook=write_data)
+
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
self.stubs.Set(self.service, '_discard_bytes',
self.service.backup(backup, rbd_io)
+ self.assertEquals(self.called, ['list', 'popen_init', 'read',
+ 'popen_init', 'write',
+ 'stdout_close', 'communicate'])
+
# Ensure the files are equal
self.assertEqual(checksum.digest(), self.checksum.digest())
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
- # Must be something mutable
- remove_called = []
-
def remove(inst, ioctx, name):
- remove_called.append(True)
+ self.called.append('remove')
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
- self.assertTrue(remove_called[0])
+ self.assertEquals(self.called, ['remove'])
def test_try_delete_base_image(self):
# don't create volume db entry since it should not be required
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
- # Must be something mutable
- remove_called = []
-
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)
def remove(inst, ioctx, name):
- remove_called.append(True)
+ self.called.append('remove')
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
- self.assertTrue(remove_called[0])
+ self.assertEquals(self.called, ['remove'])
def test_try_delete_base_image_busy(self):
"""This should induce retries then raise rbd.ImageBusy."""
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
- # Must be something mutable
- remove_called = []
-
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)
self.assertEqual(resp, not_allowed)
self._set_service_stub('_file_is_rbd', True)
+ def test_piped_execute(self):
+ self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
+ self._setup_mock_popen(self, ['out', 'err'])
+ self.service._piped_execute(['foo'], ['bar'])
+ self.assertEquals(self.called, ['popen_init', 'popen_init',
+ 'stdout_close', 'communicate'])
+
def tearDown(self):
self.volume_file.close()
self.stubs.UnsetAll()