if not caught:
self.fail('Expected raised exception but nothing caught.')
- def test_set_execute(self):
- drv = self._driver
-
- rfsclient = os_brick.remotefs.remotefs.RemoteFsClient
-
- with mock.patch.object(rfsclient, 'set_execute') as mock_set_execute:
- def my_execute(*a, **k):
- pass
-
- drv.set_execute(my_execute)
-
- mock_set_execute.assert_called_once_with(my_execute)
-
def test_local_path(self):
"""local_path common use case."""
self.override_config("glusterfs_mount_point_base",
self.driver = gpfs.GPFSDriver(configuration=conf.Configuration(None))
self.driver.gpfs_execute = self._execute_wrapper
- self.driver.set_execute(self._execute_wrapper)
+ exec_patcher = mock.patch.object(self.driver, '_execute',
+ self._execute_wrapper)
+ exec_patcher.start()
+ self.addCleanup(exec_patcher.stop)
self.driver._cluster_id = '123456'
self.driver._gpfs_device = '/dev/gpfs'
self.driver._storage_pool = 'system'
self.driver = gpfs.GPFSNFSDriver(configuration=conf.
Configuration(None))
self.driver.gpfs_execute = self._execute_wrapper
- self.driver.set_execute(self._execute_wrapper)
self.context = context.get_admin_context()
self.context.user_id = 'fake'
self.context.project_id = 'fake'
self.assertRaises(exception.VolumeBackendAPIException,
self.drv.check_for_setup_error)
+ exec_patcher = mock.patch.object(self.drv, '_execute',
+ mock.MagicMock())
+ exec_patcher.start()
+ self.addCleanup(exec_patcher.stop)
@mock.patch.object(driver.urllib.request, 'urlopen')
def test_check_for_setup_error_with_urlerror(self, mock_urlopen):
raise AssertionError('unexpected command called: %s' % cmd_string)
def test_activate_vg(self):
- executor = mock.MagicMock()
- self.vg.set_execute(executor)
- self.vg.activate_vg()
- executor.assert_called_once_with('vgchange', '-ay',
- self.configuration.volume_group_name,
- root_helper=self.vg._root_helper,
- run_as_root=True)
+ with mock.patch.object(self.vg, '_execute') as executor:
+ self.vg.activate_vg()
+ executor.assert_called_once_with(
+ 'vgchange', '-ay',
+ self.configuration.volume_group_name,
+ root_helper=self.vg._root_helper,
+ run_as_root=True)
def test_deactivate_vg(self):
- executor = mock.MagicMock()
- self.vg.set_execute(executor)
- self.vg.deactivate_vg()
- executor.assert_called_once_with('vgchange', '-an',
- self.configuration.volume_group_name,
- root_helper=self.vg._root_helper,
- run_as_root=True)
+ with mock.patch.object(self.vg, '_execute') as executor:
+ self.vg.deactivate_vg()
+ executor.assert_called_once_with(
+ 'vgchange', '-an',
+ self.configuration.volume_group_name,
+ root_helper=self.vg._root_helper,
+ run_as_root=True)
def test_destroy_vg(self):
- executor = mock.MagicMock()
- self.vg.set_execute(executor)
- self.vg.destroy_vg()
- executor.assert_called_once_with('vgremove', '-f',
- self.configuration.volume_group_name,
- root_helper=self.vg._root_helper,
- run_as_root=True)
+ with mock.patch.object(self.vg, '_execute') as executor:
+ self.vg.destroy_vg()
+ executor.assert_called_once_with(
+ 'vgremove', '-f',
+ self.configuration.volume_group_name,
+ root_helper=self.vg._root_helper,
+ run_as_root=True)
def test_pv_resize(self):
- executor = mock.MagicMock()
- self.vg.set_execute(executor)
- self.vg.pv_resize('fake-pv', '50G')
- executor.assert_called_once_with('pvresize',
- '--setphysicalvolumesize',
- '50G', 'fake-pv',
- root_helper=self.vg._root_helper,
- run_as_root=True)
+ with mock.patch.object(self.vg, '_execute') as executor:
+ self.vg.pv_resize('fake-pv', '50G')
+ executor.assert_called_once_with(
+ 'pvresize',
+ '--setphysicalvolumesize',
+ '50G', 'fake-pv',
+ root_helper=self.vg._root_helper,
+ run_as_root=True)
def test_extend_thin_pool_nothin(self):
- executor =\
- mock.MagicMock(side_effect=Exception('Unexpected call to execute'))
- self.vg.set_execute(executor)
- thin_calc =\
- mock.MagicMock(
- side_effect=
- Exception('Unexpected call to _calculate_thin_pool_size'))
- self.vg._calculate_thin_pool_size = thin_calc
- self.vg.extend_thin_pool()
+ with mock.patch.object(self.vg, '_execute') as executor:
+ executor.side_effect = AssertionError
+ thin_calc =\
+ mock.MagicMock(
+ side_effect=
+ Exception('Unexpected call to _calculate_thin_pool_size'))
+ self.vg._calculate_thin_pool_size = thin_calc
+ self.vg.extend_thin_pool()
def test_extend_thin_pool_thin(self):
self.stubs.Set(processutils, 'execute', self.fake_execute)
self._driver = srb.SRBDriver(configuration=self.configuration)
# Stub processutils.execute for static methods
self.stubs.Set(processutils, 'execute', self._fake_execute)
- self._driver.set_execute(self._fake_execute)
+ exec_patcher = mock.patch.object(self._driver,
+ '_execute',
+ self._fake_execute)
+ exec_patcher.start()
+ self.addCleanup(exec_patcher.stop)
self._configure_driver()
def test_setup(self):
def _fake_execute(_command, *_args, **_kwargs):
"""Fake _execute."""
return self.output, None
- self.volume.driver.set_execute(_fake_execute)
+ exec_patcher = mock.patch.object(self.volume.driver, '_execute',
+ _fake_execute)
+ exec_patcher.start()
+ self.addCleanup(exec_patcher.stop)
self.volume.driver.set_initialized()
self.addCleanup(self._cleanup)
self.configuration.append_config_values(iser_opts)
utils.setup_tracing(self.configuration.safe_get('trace_flags'))
- self.set_execute(execute)
+ self._execute = execute
self._stats = {}
self.pools = []
raise exception.RemoveExportException(volume=snapshot.id,
reason=ex)
- def set_execute(self, execute):
- self._execute = execute
-
def set_initialized(self):
self._initialized = True
'glusterfs', root_helper, execute,
glusterfs_mount_point_base=self.base)
- def set_execute(self, execute):
- super(GlusterfsDriver, self).set_execute(execute)
- if self._remotefsclient:
- self._remotefsclient.set_execute(execute)
-
def do_setup(self, context):
"""Any initialization the volume driver does while starting."""
super(GlusterfsDriver, self).do_setup(context)
return service
- def set_execute(self, execute):
- self._execute = execute
-
def extend_volume(self, volume, new_size):
"""Extend an existing volume.
self.configuration.san_ssh_port = self.configuration.nas_ssh_port
self.configuration.ibmnas_platform_type = \
self.configuration.ibmnas_platform_type.lower()
+ self._execute = execute
LOG.info(_LI('Initialized driver for IBMNAS Platform: %s.'),
self.configuration.ibmnas_platform_type)
- def set_execute(self, execute):
- self._execute = utils.execute
-
def do_setup(self, context):
"""Any initialization the volume driver does while starting."""
super(IBMNAS_NFSDriver, self).do_setup(context)
self.configuration.append_config_values(na_opts.netapp_img_cache_opts)
self.configuration.append_config_values(na_opts.netapp_nfs_extra_opts)
- def set_execute(self, execute):
- self._execute = execute
-
def do_setup(self, context):
super(NetAppNfsDriver, self).do_setup(context)
self._context = context
self._sparse_copy_volume_data = True
- def set_execute(self, execute):
- super(NfsDriver, self).set_execute(execute)
- if self._remotefsclient:
- self._remotefsclient.set_execute(execute)
-
def do_setup(self, context):
"""Any initialization the volume driver does while starting."""
super(NfsDriver, self).do_setup(context)
self.configuration.safe_get('volume_backend_name') or 'SRB_iSCSI'
self.protocol = 'iSCSI'
- def set_execute(self, execute):
- super(SRBISCSIDriver, self).set_execute(execute)
- if self.target_driver is not None:
- self.target_driver.set_execute(execute)
-
def ensure_export(self, context, volume):
device_path = self._mapper_path(volume)