mock_get_active_image_from_info,\
mock.patch.object(drv, '_qemu_img_info') as \
mock_qemu_img_info,\
- mock.patch.object(base_driver.VolumeDriver, 'backup_volume') as \
+ mock.patch.object(base_driver.BaseVD, 'backup_volume') as \
mock_backup_volume:
ctxt = context.RequestContext('fake_user', 'fake_project')
volume = self._simple_volume()
mock_get_active_image_from_info,\
mock.patch.object(drv, '_qemu_img_info') as \
mock_qemu_img_info,\
- mock.patch.object(base_driver.VolumeDriver, 'backup_volume') as \
+ mock.patch.object(base_driver.BaseVD, 'backup_volume') as \
mock_backup_volume:
ctxt = context.RequestContext('fake_user', 'fake_project')
volume = self._simple_volume()
class PureBaseVolumeDriverTestCase(PureDriverTestCase):
+ class fake_pure_base_volume_driver(pure.PureBaseVolumeDriver):
+ def initialize_connection():
+ pass
+
def setUp(self):
super(PureBaseVolumeDriverTestCase, self).setUp()
- self.driver = pure.PureBaseVolumeDriver(configuration=self.mock_config)
+ self.driver = self.fake_pure_base_volume_driver(
+ configuration=self.mock_config)
self.driver._array = self.array
def test_generate_purity_host_name(self):
self.configuration.ssh_max_pool_conn = 5
self.configuration.ssh_conn_timeout = 30
+ class fake_san_driver(san.SanDriver):
+ def initialize_connection():
+ pass
+
+ def create_volume():
+ pass
+
+ def delete_volume():
+ pass
+
+ def terminate_connection():
+ pass
+
@mock.patch.object(san.processutils, 'ssh_execute')
@mock.patch.object(san.ssh_utils, 'SSHPool')
@mock.patch.object(san.utils, 'check_ssh_injection')
def test_ssh_formatted_command(self, mock_check_ssh_injection,
mock_ssh_pool, mock_ssh_execute):
- driver = san.SanDriver(configuration=self.configuration)
+ driver = self.fake_san_driver(configuration=self.configuration)
cmd_list = ['uname', '-s']
expected_cmd = 'uname -s'
driver.san_execute(*cmd_list)
from cinder.image import image_utils
from cinder.openstack.common import fileutils
from cinder import utils
+from cinder.volume import driver
from cinder.volume.drivers import remotefs as remotefs_drv
LOG = logging.getLogger(__name__)
CONF.register_opts(volume_opts)
-class GlusterfsDriver(remotefs_drv.RemoteFSSnapDriver):
+class GlusterfsDriver(remotefs_drv.RemoteFSSnapDriver, driver.CloneableVD,
+ driver.ExtendVD):
"""Gluster based cinder driver. Creates file on Gluster share for using it
as block device on hypervisor.
driver_volume_type = 'glusterfs'
driver_prefix = 'glusterfs'
volume_backend_name = 'GlusterFS'
- VERSION = '1.2.0'
+ VERSION = '1.3.0'
def __init__(self, execute=processutils.execute, *args, **kwargs):
self._remotefsclient = None
from cinder.i18n import _, _LE, _LI, _LW
from cinder.image import image_utils
from cinder import utils
+from cinder.volume import driver
from cinder.volume.drivers import remotefs
-VERSION = '1.2.0'
+VERSION = '1.3.0'
LOG = logging.getLogger(__name__)
CONF.register_opts(nfs_opts)
-class NfsDriver(remotefs.RemoteFSDriver):
+class NfsDriver(driver.ExtendVD, remotefs.RemoteFSDriver):
"""NFS based cinder driver. Creates file on NFS share for using it
as block device on hypervisor.
"""
return lvo_inner1
-class RemoteFSDriver(driver.VolumeDriver):
+class RemoteFSDriver(driver.LocalVD, driver.TransferVD, driver.BaseVD):
"""Common base for drivers that work like NFS."""
driver_volume_type = None
LOG.debug('Available shares %s', self._mounted_shares)
- def create_cloned_volume(self, volume, src_vref):
- raise NotImplementedError()
-
def delete_volume(self, volume):
"""Deletes a logical volume.
return nas_option
-class RemoteFSSnapDriver(RemoteFSDriver):
+class RemoteFSSnapDriver(RemoteFSDriver, driver.SnapshotVD):
"""Base class for remotefs drivers implementing qcow2 snapshots.
Driver must implement:
CONF.register_opts(san_opts)
-class SanDriver(driver.VolumeDriver):
+class SanDriver(driver.BaseVD):
"""Base class for SAN-style storage volumes
A SAN-style storage value is 'different' because the volume controller