]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
rbd: simplify configuration and use librbd and librados
authorJosh Durgin <josh.durgin@inktank.com>
Wed, 22 May 2013 00:49:02 +0000 (17:49 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Thu, 30 May 2013 20:26:24 +0000 (13:26 -0700)
Add an rbd_ceph_conf options to mirror glance configuration, and use
the existing rbd_user option to choose how to connect to the cluster
instead of relying on an environment variable.  Use these settings
when running command line programs and when connecting via librados.

Use absolute imports so that importing the python librbd bindings
via 'import rbd' does not try to import cinder.drivers.rbd again.

Create some convenience wrappers to simplify librbd and librados
error handling and cleanup. Using these everywhere also simplifies
testing. Mock out all the librados and librbd calls in the tests
so these libraries don't need to be installed.

Remove the local_path() method since it's never used. It was
left over from nova-volume.

There are only three things still relying on the command line:
- importing an image
- exporting to an image
- getting monitor addresses

Importing and exporting on the command line include zero-detection
that would be little benefit to replicate here. librados and librbd
don't have a simple interface to obtain the monitor addresses, so
leave that to a command line tool as well.

Fixes: bug 1083540
Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
Change-Id: I32d059c5e460c2dd8423119b3dbe4a9921f5e907

cinder/tests/test_rbd.py
cinder/volume/drivers/rbd.py

index 4942d02417c02fa1bf0e6fe8e60938b686bce9b9..d36673b07031b427bdeda88f439ed2dcccf74f34 100644 (file)
@@ -29,8 +29,7 @@ from cinder import test
 from cinder.tests.image import fake as fake_image
 from cinder.tests.test_volume import DriverTestCase
 from cinder.volume import configuration as conf
-from cinder.volume.drivers.rbd import RBDDriver
-from cinder.volume.drivers.rbd import VERSION as DRIVER_VERSION
+import cinder.volume.drivers.rbd as driver
 
 LOG = logging.getLogger(__name__)
 
@@ -67,37 +66,14 @@ class FakeImageService:
     def download(self, context, image_id, path):
         pass
 
-RADOS_DF_OUT = """
-{
-   "total_space" : "958931232",
-   "total_used" : "123906196",
-   "total_objects" : "4221",
-   "total_avail" : "787024012",
-   "pools" : [
-      {
-         "name" : "volumes",
-         "categories" : [
-            {
-               "write_bytes" : "226833",
-               "size_kb" : "17038386",
-               "read_bytes" : "221865",
-               "num_objects" : "4186",
-               "name" : "",
-               "size_bytes" : "17447306589",
-               "write_kb" : "20302730",
-               "num_object_copies" : "8372",
-               "read_kb" : "30",
-               "num_objects_unfound" : "0",
-               "num_object_clones" : "9",
-               "num_objects_missing_on_primary" : "0",
-               "num_objects_degraded" : "0"
-            }
-         ],
-         "id" : "4"
-      }
-   ]
-}
-"""
+
+class TestUtil(test.TestCase):
+    def test_ascii_str(self):
+        self.assertEqual(None, driver.ascii_str(None))
+        self.assertEqual('foo', driver.ascii_str('foo'))
+        self.assertEqual('foo', driver.ascii_str(u'foo'))
+        self.assertRaises(UnicodeEncodeError,
+                          driver.ascii_str, 'foo' + unichr(300))
 
 
 class RBDTestCase(test.TestCase):
@@ -107,17 +83,114 @@ class RBDTestCase(test.TestCase):
 
         def fake_execute(*args, **kwargs):
             return '', ''
-        self._mox = mox.Mox()
         self.configuration = mox.MockObject(conf.Configuration)
         self.configuration.volume_tmp_dir = None
         self.configuration.rbd_pool = 'rbd'
+        self.configuration.rbd_ceph_conf = None
         self.configuration.rbd_secret_uuid = None
         self.configuration.rbd_user = None
         self.configuration.append_config_values(mox.IgnoreArg())
 
-        self.driver = RBDDriver(execute=fake_execute,
-                                configuration=self.configuration)
-        self._mox.ReplayAll()
+        self.rados = self.mox.CreateMockAnything()
+        self.rbd = self.mox.CreateMockAnything()
+        self.driver = driver.RBDDriver(execute=fake_execute,
+                                       configuration=self.configuration,
+                                       rados=self.rados,
+                                       rbd=self.rbd)
+
+    def test_create_volume(self):
+        name = u'volume-00000001'
+        size = 1
+        volume = dict(name=name, size=size)
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        mock_rbd = self.mox.CreateMockAnything()
+        self.rbd.RBD().AndReturn(mock_rbd)
+        mock_rbd.create(mox.IgnoreArg(), str(name), size * 1024 ** 3,
+                        old_format=False,
+                        features=self.rbd.RBD_FEATURE_LAYERING)
+        mock_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.create_volume(volume)
+
+    def test_delete_volume(self):
+        name = u'volume-00000001'
+        volume = dict(name=name)
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        mock_rbd = self.mox.CreateMockAnything()
+        self.rbd.RBD().AndReturn(mock_rbd)
+        mock_rbd.remove(mox.IgnoreArg(), str(name))
+        mock_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.delete_volume(volume)
+
+    def test_create_snapshot(self):
+        vol_name = u'volume-00000001'
+        snap_name = u'snapshot-name'
+        snapshot = dict(volume_name=vol_name, name=snap_name)
+        mock_proxy = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, vol_name) \
+            .AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        mock_proxy.create_snap(str(snap_name))
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        mock_proxy.protect_snap(str(snap_name))
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.create_snapshot(snapshot)
+
+    def test_delete_snapshot(self):
+        vol_name = u'volume-00000001'
+        snap_name = u'snapshot-name'
+        snapshot = dict(volume_name=vol_name, name=snap_name)
+        mock_proxy = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, vol_name) \
+            .AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        mock_proxy.unprotect_snap(str(snap_name))
+        mock_proxy.remove_snap(str(snap_name))
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.delete_snapshot(snapshot)
+
+    def test_create_cloned_volume(self):
+        src_name = u'volume-00000001'
+        dst_name = u'volume-00000002'
+        mock_proxy = self.mox.CreateMockAnything()
+        mock_proxy.ioctx = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, src_name, read_only=True) \
+            .AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        mock_proxy.copy(mock_proxy.ioctx, str(dst_name))
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.create_cloned_volume(dict(name=dst_name),
+                                         dict(name=src_name))
 
     def test_good_locations(self):
         locations = ['rbd://fsid/pool/image/snap',
@@ -141,6 +214,18 @@ class RBDTestCase(test.TestCase):
     def test_cloneable(self):
         self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
         location = 'rbd://abc/pool/image/snap'
+        mock_proxy = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, 'image',
+                              pool='pool',
+                              snapshot='snap',
+                              read_only=True).AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
         self.assertTrue(self.driver._is_cloneable(location))
 
     def test_uncloneable_different_fsid(self):
@@ -149,11 +234,18 @@ class RBDTestCase(test.TestCase):
         self.assertFalse(self.driver._is_cloneable(location))
 
     def test_uncloneable_unreadable(self):
-        def fake_exc(*args):
-            raise exception.ProcessExecutionError()
         self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
-        self.stubs.Set(self.driver, '_execute', fake_exc)
         location = 'rbd://abc/pool/image/snap'
+        self.stubs.Set(self.rbd, 'Error', test.TestingException)
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, 'image',
+                              pool='pool',
+                              snapshot='snap',
+                              read_only=True).AndRaise(test.TestingException)
+
+        self.mox.ReplayAll()
+
         self.assertFalse(self.driver._is_cloneable(location))
 
     def _copy_image(self):
@@ -166,6 +258,8 @@ class RBDTestCase(test.TestCase):
         self.stubs.Set(tempfile, 'NamedTemporaryFile', fake_temp_file)
         self.stubs.Set(os.path, 'exists', lambda x: True)
         self.stubs.Set(image_utils, 'fetch_to_raw', lambda w, x, y, z: None)
+        self.stubs.Set(self.driver, 'delete_volume', lambda x: None)
+        self.stubs.Set(self.driver, '_resize', lambda x: None)
         self.driver.copy_image_to_volume(None, {'name': 'test',
                                                 'size': 1},
                                          FakeImageService(), None)
@@ -179,38 +273,52 @@ class RBDTestCase(test.TestCase):
         self._copy_image()
 
     def test_update_volume_stats(self):
-        def fake_stats(*args):
-            return RADOS_DF_OUT, ''
-
-        def fake_safe_get(*args):
-            return "RBD"
+        self.stubs.Set(self.driver.configuration, 'safe_get', lambda x: 'RBD')
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        self.mox.StubOutWithMock(mock_client, 'cluster')
+        mock_client.cluster.get_cluster_stats().AndReturn(dict(
+            kb=1234567890,
+            kb_used=4567890,
+            kb_avail=1000000000,
+            num_objects=4683))
+        mock_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
 
-        self.stubs.Set(self.driver, '_execute', fake_stats)
-        self.stubs.Set(self.driver.configuration, 'safe_get', fake_safe_get)
         expected = dict(
             volume_backend_name='RBD',
             vendor_name='Open Source',
-            driver_version=DRIVER_VERSION,
+            driver_version=driver.VERSION,
             storage_protocol='ceph',
-            total_capacity_gb=914,
-            free_capacity_gb=750,
+            total_capacity_gb=1177,
+            free_capacity_gb=953,
             reserved_percentage=0)
         actual = self.driver.get_volume_stats(True)
         self.assertDictMatch(expected, actual)
 
     def test_update_volume_stats_error(self):
-        def fake_exc(*args):
-            raise exception.ProcessExecutionError()
+        self.stubs.Set(self.driver.configuration, 'safe_get', lambda x: 'RBD')
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
 
-        def fake_safe_get(*args):
-            return "RBD"
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        self.mox.StubOutWithMock(mock_client, 'cluster')
+        self.stubs.Set(self.rados, 'Error', test.TestingException)
+        mock_client.cluster.get_cluster_stats().AndRaise(test.TestingException)
+        mock_client.__exit__(test.TestingException,
+                             mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(None)
+
+        self.mox.ReplayAll()
 
-        self.stubs.Set(self.driver, '_execute', fake_exc)
-        self.stubs.Set(self.driver.configuration, 'safe_get', fake_safe_get)
         expected = dict(
             volume_backend_name='RBD',
             vendor_name='Open Source',
-            driver_version=DRIVER_VERSION,
+            driver_version=driver.VERSION,
             storage_protocol='ceph',
             total_capacity_gb='unknown',
             free_capacity_gb='unknown',
@@ -246,6 +354,91 @@ class RBDTestCase(test.TestCase):
         actual = self.driver.initialize_connection(dict(name=name), None)
         self.assertDictMatch(expected, actual)
 
+    def test_clone(self):
+        name = u'volume-00000001'
+        volume = dict(name=name)
+        src_pool = u'images'
+        src_image = u'image-name'
+        src_snap = u'snapshot-name'
+        mock_src_client = self.mox.CreateMockAnything()
+        mock_dst_client = self.mox.CreateMockAnything()
+        mock_rbd = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver, src_pool).AndReturn(mock_src_client)
+        mock_src_client.__enter__().AndReturn(mock_src_client)
+        driver.RADOSClient(self.driver).AndReturn(mock_dst_client)
+        mock_dst_client.__enter__().AndReturn(mock_dst_client)
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        self.rbd.RBD().AndReturn(mock_rbd)
+        mock_rbd.clone(mox.IgnoreArg(),
+                       str(src_image),
+                       str(src_snap),
+                       mox.IgnoreArg(),
+                       str(name),
+                       features=self.rbd.RBD_FEATURE_LAYERING)
+        mock_dst_client.__exit__(None, None, None).AndReturn(None)
+        mock_src_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver._clone(volume, src_pool, src_image, src_snap)
+
+    def test_rbd_volume_proxy_init(self):
+        name = u'volume-00000001'
+        snap = u'snapshot-name'
+        self.stubs.Set(self.driver, '_connect_to_rados',
+                       lambda x: (None, None))
+        self.mox.StubOutWithMock(self.driver, '_disconnect_from_rados')
+
+        # no snapshot
+        self.rbd.Image(None, str(name), snapshot=None, read_only=False) \
+                .AndReturn(None)
+        # snapshot
+        self.rbd.Image(None, str(name), snapshot=str(snap), read_only=True) \
+                .AndReturn(None)
+        # error causes disconnect
+        self.stubs.Set(self.rbd, 'Error', test.TestingException)
+        self.rbd.Image(None, str(name), snapshot=None, read_only=False) \
+                .AndRaise(test.TestingException)
+        self.driver._disconnect_from_rados(None, None)
+
+        self.mox.ReplayAll()
+
+        driver.RBDVolumeProxy(self.driver, name)
+        driver.RBDVolumeProxy(self.driver, name, snapshot=snap, read_only=True)
+        self.assertRaises(test.TestingException,
+                          driver.RBDVolumeProxy, self.driver, name)
+
+    def test_connect_to_rados(self):
+        mock_client = self.mox.CreateMockAnything()
+        mock_ioctx = self.mox.CreateMockAnything()
+        self.stubs.Set(self.rados, 'Error', test.TestingException)
+
+        # default configured pool
+        self.rados.Rados(rados_id=None, conffile=None).AndReturn(mock_client)
+        mock_client.connect()
+        mock_client.open_ioctx('rbd').AndReturn(mock_ioctx)
+
+        # different pool
+        self.rados.Rados(rados_id=None, conffile=None).AndReturn(mock_client)
+        mock_client.connect()
+        mock_client.open_ioctx('images').AndReturn(mock_ioctx)
+
+        # error
+        self.rados.Rados(rados_id=None, conffile=None).AndReturn(mock_client)
+        mock_client.connect()
+        mock_client.open_ioctx('rbd').AndRaise(test.TestingException)
+        mock_client.shutdown()
+
+        self.mox.ReplayAll()
+
+        self.assertEqual((mock_client, mock_ioctx),
+                         self.driver._connect_to_rados())
+        self.assertEqual((mock_client, mock_ioctx),
+                         self.driver._connect_to_rados('images'))
+        self.assertRaises(test.TestingException, self.driver._connect_to_rados)
+
 
 class ManagedRBDTestCase(DriverTestCase):
     driver_name = "cinder.volume.drivers.rbd.RBDDriver"
index 1a06961ad23aff1a4ae0cfea6b55d75f803d06be..c765ecfcf3edbc77c28971af674c7c1c40fe4c7d 100644 (file)
@@ -15,6 +15,8 @@
 RADOS Block Device Driver
 """
 
+from __future__ import absolute_import
+
 import json
 import os
 import tempfile
@@ -28,6 +30,13 @@ from cinder.openstack.common import log as logging
 from cinder import utils
 from cinder.volume import driver
 
+try:
+    import rados
+    import rbd
+except ImportError:
+    rados = None
+    rbd = None
+
 LOG = logging.getLogger(__name__)
 
 rbd_opts = [
@@ -36,7 +45,11 @@ rbd_opts = [
                help='the RADOS pool in which rbd volumes are stored'),
     cfg.StrOpt('rbd_user',
                default=None,
-               help='the RADOS client name for accessing rbd volumes'),
+               help='the RADOS client name for accessing rbd volumes '
+                    '- only set when using cephx authentication'),
+    cfg.StrOpt('rbd_ceph_conf',
+               default='',  # default determined by librados
+               help='path to the ceph configuration file to use'),
     cfg.StrOpt('rbd_secret_uuid',
                default=None,
                help='the libvirt uuid of the secret for the rbd_user'
@@ -46,7 +59,72 @@ rbd_opts = [
                help='where to store temporary image files if the volume '
                     'driver does not write them directly to the volume'), ]
 
-VERSION = '1.0'
+VERSION = '1.1'
+
+
+def ascii_str(string):
+    """
+    Convert a string to ascii, or return None if the input is None.
+
+    This is useful where a parameter may be None by default, or a
+    string. librbd only accepts ascii, hence the need for conversion.
+    """
+    if string is None:
+        return string
+    return str(string)
+
+
+class RBDVolumeProxy(object):
+    """
+    Context manager for dealing with an existing rbd volume.
+
+    This handles connecting to rados and opening an ioctx automatically,
+    and otherwise acts like a librbd Image object.
+
+    The underlying librados client and ioctx can be accessed as
+    the attributes 'client' and 'ioctx'.
+    """
+    def __init__(self, driver, name, pool=None, snapshot=None,
+                 read_only=False):
+        client, ioctx = driver._connect_to_rados(pool)
+        try:
+            self.volume = driver.rbd.Image(ioctx, str(name),
+                                           snapshot=ascii_str(snapshot),
+                                           read_only=read_only)
+        except driver.rbd.Error:
+            LOG.exception(_("error opening rbd image %s"), name)
+            driver._disconnect_from_rados(client, ioctx)
+            raise
+        self.driver = driver
+        self.client = client
+        self.ioctx = ioctx
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_, value, traceback):
+        try:
+            self.volume.close()
+        finally:
+            self.driver._disconnect_from_rados(self.client, self.ioctx)
+
+    def __getattr__(self, attrib):
+        return getattr(self.volume, attrib)
+
+
+class RADOSClient(object):
+    """
+    Context manager to simplify error handling for connecting to ceph
+    """
+    def __init__(self, driver, pool=None):
+        self.driver = driver
+        self.cluster, self.ioctx = driver._connect_to_rados(pool)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_, value, traceback):
+        self.driver._disconnect_from_rados(self.cluster, self.ioctx)
 
 
 class RBDDriver(driver.VolumeDriver):
@@ -55,18 +133,52 @@ class RBDDriver(driver.VolumeDriver):
         super(RBDDriver, self).__init__(*args, **kwargs)
         self.configuration.append_config_values(rbd_opts)
         self._stats = {}
+        # allow overrides for testing
+        self.rados = kwargs.get('rados', rados)
+        self.rbd = kwargs.get('rbd', rbd)
 
     def check_for_setup_error(self):
         """Returns an error if prerequisites aren't met"""
-        (stdout, stderr) = self._execute('rados', 'lspools')
-        pools = stdout.split("\n")
-        if self.configuration.rbd_pool not in pools:
-            exception_message = (_("rbd has no pool %s") %
-                                 self.configuration.rbd_pool)
-            raise exception.VolumeBackendAPIException(data=exception_message)
+        if rados is None:
+            msg = _('rados and rbd python libraries not found')
+            raise exception.VolumeBackendAPIException(data=msg)
+        try:
+            with RADOSClient(self):
+                pass
+        except self.rados.Error:
+            msg = _('error connecting to ceph cluster')
+            LOG.exception(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+    def _ceph_args(self):
+        args = []
+        if self.configuration.rbd_user:
+            args.extend(['--id', self.configuration.rbd_user])
+        if self.configuration.rbd_ceph_conf:
+            args.extend(['--conf', self.configuration.rbd_ceph_conf])
+        return args
+
+    def _connect_to_rados(self, pool=None):
+        ascii_user = ascii_str(self.configuration.rbd_user)
+        ascii_conf = ascii_str(self.configuration.rbd_ceph_conf)
+        client = self.rados.Rados(rados_id=ascii_user, conffile=ascii_conf)
+        try:
+            client.connect()
+            pool_to_open = str(pool or self.configuration.rbd_pool)
+            ioctx = client.open_ioctx(pool_to_open)
+            return client, ioctx
+        except self.rados.Error:
+            # shutdown cannot raise an exception
+            client.shutdown()
+            raise
+
+    def _disconnect_from_rados(self, client, ioctx):
+        # closing an ioctx cannot raise an exception
+        ioctx.close()
+        client.shutdown()
 
     def _get_mon_addrs(self):
-        args = ['ceph', 'mon', 'dump', '--format=json']
+        args = ['ceph', 'mon', 'dump', '--format=json'] + self._ceph_args()
         out, _ = self._execute(*args)
         lines = out.split('\n')
         if lines[0].startswith('dumped monmap epoch'):
@@ -93,13 +205,11 @@ class RBDDriver(driver.VolumeDriver):
         stats['volume_backend_name'] = backend_name or 'RBD'
 
         try:
-            stdout, _err = self._execute('rados', 'df', '--format', 'json')
-            new_stats = json.loads(stdout)
-            total = int(new_stats['total_space']) / 1024 ** 2
-            free = int(new_stats['total_avail']) / 1024 ** 2
-            stats['total_capacity_gb'] = total
-            stats['free_capacity_gb'] = free
-        except exception.ProcessExecutionError:
+            with RADOSClient(self) as client:
+                new_stats = client.cluster.get_cluster_stats()
+            stats['total_capacity_gb'] = new_stats['kb'] / 1024 ** 2
+            stats['free_capacity_gb'] = new_stats['kb_avail'] / 1024 ** 2
+        except self.rados.Error:
             # just log and return unknown capacities
             LOG.exception(_('error refreshing volume stats'))
         self._stats = stats
@@ -112,45 +222,50 @@ class RBDDriver(driver.VolumeDriver):
         return self._stats
 
     def _supports_layering(self):
-        stdout, _ = self._execute('rbd', '--help')
-        return 'clone' in stdout
+        return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
 
     def create_cloned_volume(self, volume, src_vref):
         """Clone a logical volume"""
-        self._try_execute('rbd', 'cp',
-                          '--pool', self.configuration.rbd_pool,
-                          '--image', src_vref['name'],
-                          '--dest-pool', self.configuration.rbd_pool,
-                          '--dest', volume['name'])
+        with RBDVolumeProxy(self, src_vref['name'], read_only=True) as vol:
+            vol.copy(vol.ioctx, str(volume['name']))
 
     def create_volume(self, volume):
         """Creates a logical volume."""
         if int(volume['size']) == 0:
-            size = 100
+            size = 100 * 1024 ** 2
         else:
-            size = int(volume['size']) * 1024
-        args = ['rbd', 'create',
-                '--pool', self.configuration.rbd_pool,
-                '--size', size,
-                volume['name']]
+            size = int(volume['size']) * 1024 ** 3
+
+        old_format = True
+        features = 0
         if self._supports_layering():
-            args += ['--new-format']
-        self._try_execute(*args)
+            old_format = False
+            features = self.rbd.RBD_FEATURE_LAYERING
+
+        with RADOSClient(self) as client:
+            self.rbd.RBD().create(client.ioctx,
+                                  str(volume['name']),
+                                  size,
+                                  old_format=old_format,
+                                  features=features)
 
     def _clone(self, volume, src_pool, src_image, src_snap):
-        self._try_execute('rbd', 'clone',
-                          '--pool', src_pool,
-                          '--image', src_image,
-                          '--snap', src_snap,
-                          '--dest-pool', self.configuration.rbd_pool,
-                          '--dest', volume['name'])
+        LOG.debug(_('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s') %
+                  dict(pool=src_pool, img=src_image, snap=src_snap,
+                       dst=volume['name']))
+        with RADOSClient(self, src_pool) as src_client:
+            with RADOSClient(self) as dest_client:
+                self.rbd.RBD().clone(src_client.ioctx,
+                                     str(src_image),
+                                     str(src_snap),
+                                     dest_client.ioctx,
+                                     str(volume['name']),
+                                     features=self.rbd.RBD_FEATURE_LAYERING)
 
     def _resize(self, volume):
-        size = int(volume['size']) * 1024
-        self._try_execute('rbd', 'resize',
-                          '--pool', self.configuration.rbd_pool,
-                          '--image', volume['name'],
-                          '--size', size)
+        size = int(volume['size']) * 1024 ** 3
+        with RBDVolumeProxy(self, volume['name']) as vol:
+            vol.resize(size)
 
     def create_volume_from_snapshot(self, volume, snapshot):
         """Creates a volume from a snapshot."""
@@ -161,47 +276,30 @@ class RBDDriver(driver.VolumeDriver):
 
     def delete_volume(self, volume):
         """Deletes a logical volume."""
-        stdout, _ = self._execute('rbd', 'snap', 'ls',
-                                  '--pool', self.configuration.rbd_pool,
-                                  volume['name'])
-        if stdout.count('\n') > 1:
-            raise exception.VolumeIsBusy(volume_name=volume['name'])
-        self._try_execute('rbd', 'rm',
-                          '--pool', self.configuration.rbd_pool,
-                          volume['name'])
+        with RADOSClient(self) as client:
+            try:
+                self.rbd.RBD().remove(client.ioctx, str(volume['name']))
+            except self.rbd.ImageHasSnapshots:
+                raise exception.VolumeIsBusy(volume_name=volume['name'])
 
     def create_snapshot(self, snapshot):
         """Creates an rbd snapshot"""
-        self._try_execute('rbd', 'snap', 'create',
-                          '--pool', self.configuration.rbd_pool,
-                          '--snap', snapshot['name'],
-                          snapshot['volume_name'])
-        if self._supports_layering():
-            self._try_execute('rbd', 'snap', 'protect',
-                              '--pool', self.configuration.rbd_pool,
-                              '--snap', snapshot['name'],
-                              snapshot['volume_name'])
+        with RBDVolumeProxy(self, snapshot['volume_name']) as volume:
+            snap = str(snapshot['name'])
+            volume.create_snap(snap)
+            if self._supports_layering():
+                volume.protect_snap(snap)
 
     def delete_snapshot(self, snapshot):
         """Deletes an rbd snapshot"""
-        if self._supports_layering():
-            try:
-                self._try_execute('rbd', 'snap', 'unprotect',
-                                  '--pool', self.configuration.rbd_pool,
-                                  '--snap', snapshot['name'],
-                                  snapshot['volume_name'])
-            except exception.ProcessExecutionError:
-                raise exception.SnapshotIsBusy(snapshot_name=snapshot['name'])
-        self._try_execute('rbd', 'snap', 'rm',
-                          '--pool', self.configuration.rbd_pool,
-                          '--snap', snapshot['name'],
-                          snapshot['volume_name'])
-
-    def local_path(self, volume):
-        """Returns the path of the rbd volume."""
-        # This is the same as the remote path
-        # since qemu accesses it directly.
-        return "rbd:%s/%s" % (self.configuration.rbd_pool, volume['name'])
+        with RBDVolumeProxy(self, snapshot['volume_name']) as volume:
+            snap = str(snapshot['name'])
+            if self._supports_layering():
+                try:
+                    volume.unprotect_snap(snap)
+                except self.rbd.ImageBusy:
+                    raise exception.SnapshotIsBusy(snapshot_name=snap)
+            volume.remove_snap(snap)
 
     def ensure_export(self, context, volume):
         """Synchronously recreates an export for a logical volume."""
@@ -217,19 +315,20 @@ class RBDDriver(driver.VolumeDriver):
 
     def initialize_connection(self, volume, connector):
         hosts, ports = self._get_mon_addrs()
-        return {
+        data = {
             'driver_volume_type': 'rbd',
             'data': {
                 'name': '%s/%s' % (self.configuration.rbd_pool,
                                    volume['name']),
                 'hosts': hosts,
                 'ports': ports,
-                'auth_enabled': (self.configuration.rbd_secret_uuid
-                                 is not None),
+                'auth_enabled': (self.configuration.rbd_user is not None),
                 'auth_username': self.configuration.rbd_user,
                 'secret_type': 'ceph',
                 'secret_uuid': self.configuration.rbd_secret_uuid, }
         }
+        LOG.debug(_('connection data: %s'), data)
+        return data
 
     def terminate_connection(self, volume, connector, **kwargs):
         pass
@@ -249,13 +348,14 @@ class RBDDriver(driver.VolumeDriver):
         return pieces
 
     def _get_fsid(self):
-        stdout, _ = self._execute('ceph', 'fsid')
-        return stdout.rstrip('\n')
+        with RADOSClient(self) as client:
+            return client.cluster.get_fsid()
 
     def _is_cloneable(self, image_location):
         try:
             fsid, pool, image, snapshot = self._parse_location(image_location)
-        except exception.ImageUnacceptable:
+        except exception.ImageUnacceptable as e:
+            LOG.debug(_('not cloneable: %s'), e)
             return False
 
         if self._get_fsid() != fsid:
@@ -265,16 +365,16 @@ class RBDDriver(driver.VolumeDriver):
 
         # check that we can read the image
         try:
-            self._execute('rbd', 'info',
-                          '--pool', pool,
-                          '--image', image,
-                          '--snap', snapshot)
-        except exception.ProcessExecutionError:
-            LOG.debug(_('Unable to read image %s') % image_location)
+            with RBDVolumeProxy(self, image,
+                                pool=pool,
+                                snapshot=snapshot,
+                                read_only=True):
+                return True
+        except self.rbd.Error as e:
+            LOG.debug(_('Unable to open image %(loc)s: %(err)s') %
+                      dict(loc=image_location, err=e))
             return False
 
-        return True
-
     def clone_image(self, volume, image_location):
         if image_location is None or not self._is_cloneable(image_location):
             return False
@@ -289,25 +389,23 @@ class RBDDriver(driver.VolumeDriver):
             os.makedirs(tmp_dir)
 
     def copy_image_to_volume(self, context, volume, image_service, image_id):
-        # TODO(jdurgin): replace with librbd
-        # this is a temporary hack, since rewriting this driver
-        # to use librbd would take too long
         self._ensure_tmp_exists()
         tmp_dir = self.configuration.volume_tmp_dir
 
         with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp:
             image_utils.fetch_to_raw(context, image_service, image_id,
                                      tmp.name)
-            # import creates the image, so we must remove it first
-            self._try_execute('rbd', 'rm',
-                              '--pool', self.configuration.rbd_pool,
-                              volume['name'])
 
+            self.delete_volume(volume)
+
+            # keep using the command line import instead of librbd since it
+            # detects zeroes to preserve sparseness in the image
             args = ['rbd', 'import',
                     '--pool', self.configuration.rbd_pool,
                     tmp.name, volume['name']]
             if self._supports_layering():
                 args += ['--new-format']
+            args += self._ceph_args()
             self._try_execute(*args)
         self._resize(volume)
 
@@ -318,9 +416,11 @@ class RBDDriver(driver.VolumeDriver):
         tmp_file = os.path.join(tmp_dir,
                                 volume['name'] + '-' + image_meta['id'])
         with utils.remove_path_on_error(tmp_file):
-            self._try_execute('rbd', 'export',
-                              '--pool', self.configuration.rbd_pool,
-                              volume['name'], tmp_file)
+            args = ['rbd', 'export',
+                    '--pool', self.configuration.rbd_pool,
+                    volume['name'], tmp_file]
+            args += self._ceph_args()
+            self._try_execute(*args)
             image_utils.upload_volume(context, image_service,
                                       image_meta, tmp_file)
         os.unlink(tmp_file)