from eventlet import greenthread
import mock
-import mox
from oslo_concurrency import processutils
import paramiko
def setUp(self):
super(DellEQLSanISCSIDriverTestCase, self).setUp()
- self.configuration = mox.MockObject(conf.Configuration)
- self.configuration.append_config_values(mox.IgnoreArg())
+ self.configuration = mock.Mock(conf.Configuration)
self.configuration.san_is_local = False
self.configuration.san_ip = "10.0.0.1"
self.configuration.san_login = "foo"
self.configuration.chap_username = 'admin'
self.configuration.chap_password = 'password'
+ self.cmd = 'this is dummy command'
self._context = context.get_admin_context()
self.driver = eqlx.DellEQLSanISCSIDriver(
configuration=self.configuration)
" 7dab76162"]
self.fake_iqn = 'iqn.2003-10.com.equallogic:group01:25366:fakev'
+ self.fake_iqn_return = ['iSCSI target name is %s.' % self.fake_iqn]
self.driver._group_ip = '10.0.1.6'
self.properties = {
'target_discoverd': True,
return self.properties
def test_create_volume(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1}
- self.driver._eql_execute('volume', 'create', volume['name'],
- "%sG" % (volume['size']), 'pool',
- self.configuration.eqlx_pool,
- 'thin-provision').\
- AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
- self.driver._eql_execute('volume', 'select', volume['name'],
- 'multihost-access', 'enable')
- self.mox.ReplayAll()
- model_update = self.driver.create_volume(volume)
- self.assertEqual(model_update, self._model_update)
+ mock_attrs = {'args': ['volume', 'create', volume['name'],
+ "%sG" % (volume['size']), 'pool',
+ self.configuration.eqlx_pool,
+ 'thin-provision']}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ mock_eql_execute.return_value = self.fake_iqn_return
+ model_update = self.driver.create_volume(volume)
+ self.assertEqual(self._model_update, model_update)
def test_delete_volume(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1}
- self.driver._eql_execute('volume', 'select', volume['name'], 'show')
- self.driver._eql_execute('volume', 'select', volume['name'], 'offline')
- self.driver._eql_execute('volume', 'delete', volume['name'])
- self.mox.ReplayAll()
- self.driver.delete_volume(volume)
+ show_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
+ off_attrs = {'args': ['volume', 'select', volume['name'], 'offline']}
+ delete_attrs = {'args': ['volume', 'delete', volume['name']]}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**show_attrs)
+ mock_eql_execute.configure_mock(**off_attrs)
+ mock_eql_execute.configure_mock(**delete_attrs)
+ self.driver.delete_volume(volume)
def test_delete_absent_volume(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1, 'id': self.volid}
- self.driver._eql_execute('volume', 'select', volume['name'], 'show').\
- AndRaise(processutils.ProcessExecutionError(
- stdout='% Error ..... does not exist.\n'))
- self.mox.ReplayAll()
- self.driver.delete_volume(volume)
+ mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ mock_eql_execute.side_effect = processutils.ProcessExecutionError(
+ stdout='% Error ..... does not exist.\n')
+ self.driver.delete_volume(volume)
def test_ensure_export(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 1}
- self.driver._eql_execute('volume', 'select', volume['name'], 'show')
- self.mox.ReplayAll()
- self.driver.ensure_export({}, volume)
+ mock_attrs = {'args': ['volume', 'select', volume['name'], 'show']}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ self.driver.ensure_export({}, volume)
def test_create_snapshot(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
snap_name = 'fake_snap_name'
- self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
- 'snapshot', 'create-now').\
- AndReturn(['Snapshot name is %s' % snap_name])
- self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
- 'snapshot', 'rename', snap_name,
- snapshot['name'])
- self.mox.ReplayAll()
- self.driver.create_snapshot(snapshot)
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.return_value = ['Snapshot name is %s' % snap_name]
+ self.driver.create_snapshot(snapshot)
def test_create_volume_from_snapshot(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
volume = {'name': self.volume_name}
- self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
- 'snapshot', 'select', snapshot['name'],
- 'clone', volume['name']).\
- AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
- self.driver._eql_execute('volume', 'select', volume['name'],
- 'multihost-access', 'enable')
- self.mox.ReplayAll()
- model_update = self.driver.create_volume_from_snapshot(volume,
- snapshot)
- self.assertEqual(model_update, self._model_update)
+ mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'],
+ 'snapshot', 'select', snapshot['name'],
+ 'clone', volume['name']]}
+
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ mock_eql_execute.return_value = self.fake_iqn_return
+ model_update = self.driver.create_volume_from_snapshot(volume,
+ snapshot)
+ self.assertEqual(self._model_update, model_update)
def test_create_cloned_volume(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
src_vref = {'name': 'fake_uuid'}
volume = {'name': self.volume_name}
- self.driver._eql_execute('volume', 'select', src_vref['name'], 'clone',
- volume['name']).\
- AndReturn(['iSCSI target name is %s.' % self.fake_iqn])
- self.driver._eql_execute('volume', 'select', volume['name'],
- 'multihost-access', 'enable')
- self.mox.ReplayAll()
- model_update = self.driver.create_cloned_volume(volume, src_vref)
- self.assertEqual(model_update, self._model_update)
+ mock_attrs = {'args': ['volume', 'select', volume['name'],
+ 'multihost-access', 'enable']}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ mock_eql_execute.return_value = self.fake_iqn_return
+ model_update = self.driver.create_cloned_volume(volume, src_vref)
+ self.assertEqual(self._model_update, model_update)
def test_delete_snapshot(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
snapshot = {'name': 'fakesnap', 'volume_name': 'fakevolume_name'}
- self.driver._eql_execute('volume', 'select', snapshot['volume_name'],
- 'snapshot', 'delete', snapshot['name'])
- self.mox.ReplayAll()
- self.driver.delete_snapshot(snapshot)
+ mock_attrs = {'args': ['volume', 'select', snapshot['volume_name'],
+ 'snapshot', 'delete', snapshot['name']]}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ self.driver.delete_snapshot(snapshot)
def test_extend_volume(self):
new_size = '200'
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name, 'size': 100}
- self.driver._eql_execute('volume', 'select', volume['name'],
- 'size', "%sG" % new_size)
- self.mox.ReplayAll()
- self.driver.extend_volume(volume, new_size)
+ mock_attrs = {'args': ['volume', 'select', volume['name'],
+ 'size', "%sG" % new_size]}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ self.driver.extend_volume(volume, new_size)
def test_initialize_connection(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
volume = {'name': self.volume_name}
- self.stubs.Set(self.driver, "_get_iscsi_properties",
- self._fake_get_iscsi_properties)
- self.driver._eql_execute('volume', 'select', volume['name'], 'access',
- 'create', 'initiator',
- self.connector['initiator'],
- 'authmethod', 'chap',
- 'username',
- self.configuration.chap_username)
- self.mox.ReplayAll()
- iscsi_properties = self.driver.initialize_connection(volume,
- self.connector)
- self.assertEqual(iscsi_properties['data'],
- self._fake_get_iscsi_properties(volume))
+ mock_attrs = {'args': ['volume', 'select', volume['name'], 'access',
+ 'create', 'initiator',
+ self.connector['initiator'],
+ 'authmethod', 'chap',
+ 'username',
+ self.configuration.chap_username]}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ with mock.patch.object(self.driver,
+ '_get_iscsi_properties') as mock_iscsi:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ mock_iscsi.return_value = self.properties
+ iscsi_properties = self.driver.initialize_connection(
+ volume, self.connector)
+ self.assertEqual(self._fake_get_iscsi_properties(volume),
+ iscsi_properties['data'])
def test_terminate_connection(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
+ def my_side_effect(*args, **kwargs):
+ if args[4] == 'show':
+ return self.access_record_output
+ else:
+ return ''
volume = {'name': self.volume_name}
- self.driver._eql_execute('volume', 'select', volume['name'], 'access',
- 'show').AndReturn(self.access_record_output)
- self.driver._eql_execute('volume', 'select', volume['name'], 'access',
- 'delete', '1')
- self.mox.ReplayAll()
- self.driver.terminate_connection(volume, self.connector)
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.side_effect = my_side_effect
+ self.driver.terminate_connection(volume, self.connector)
def test_do_setup(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
fake_group_ip = '10.1.2.3'
- for feature in ('confirmation', 'paging', 'events', 'formatoutput'):
- self.driver._eql_execute('cli-settings', feature, 'off')
- self.driver._eql_execute('grpparams', 'show').\
- AndReturn(['Group-Ipaddress: %s' % fake_group_ip])
- self.mox.ReplayAll()
- self.driver.do_setup(self._context)
- self.assertEqual(fake_group_ip, self.driver._group_ip)
+
+ def my_side_effect(*args, **kwargs):
+ if args[0] == 'grpparams':
+ return ['Group-Ipaddress: %s' % fake_group_ip]
+ else:
+ return ''
+
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.side_effect = my_side_effect
+ self.driver.do_setup(self._context)
+ self.assertEqual(self.driver._group_ip, fake_group_ip)
def test_update_volume_stats(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
- self.driver._eql_execute('pool', 'select',
- self.configuration.eqlx_pool, 'show').\
- AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB'])
- self.mox.ReplayAll()
- self.driver._update_volume_stats()
- self.assertEqual(self.driver._stats['total_capacity_gb'], 111.0)
- self.assertEqual(self.driver._stats['free_capacity_gb'], 11.0)
+ mock_attrs = {'args': ['pool', 'select',
+ self.configuration.eqlx_pool, 'show']}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ mock_eql_execute.return_value = ['TotalCapacity: 111GB',
+ 'FreeSpace: 11GB']
+ self.driver._update_volume_stats()
+ self.assertEqual(111.0, self.driver._stats['total_capacity_gb'])
+ self.assertEqual(11.0, self.driver._stats['free_capacity_gb'])
def test_get_volume_stats(self):
- self.driver._eql_execute = self.mox.\
- CreateMock(self.driver._eql_execute)
- self.driver._eql_execute('pool', 'select',
- self.configuration.eqlx_pool, 'show').\
- AndReturn(['TotalCapacity: 111GB', 'FreeSpace: 11GB'])
- self.mox.ReplayAll()
- stats = self.driver.get_volume_stats(refresh=True)
- self.assertEqual(stats['total_capacity_gb'], float('111.0'))
- self.assertEqual(stats['free_capacity_gb'], float('11.0'))
- self.assertEqual(stats['vendor_name'], 'Dell')
+ mock_attrs = {'args': ['pool', 'select',
+ self.configuration.eqlx_pool, 'show']}
+ with mock.patch.object(self.driver,
+ '_eql_execute') as mock_eql_execute:
+ mock_eql_execute.configure_mock(**mock_attrs)
+ mock_eql_execute.return_value = ['TotalCapacity: 111GB',
+ 'FreeSpace: 11GB']
+ stats = self.driver.get_volume_stats(refresh=True)
+ self.assertEqual(float('111.0'), stats['total_capacity_gb'])
+ self.assertEqual(float('11.0'), stats['free_capacity_gb'])
+ self.assertEqual('Dell', stats['vendor_name'])
def test_get_space_in_gb(self):
- self.assertEqual(self.driver._get_space_in_gb('123.0GB'), 123.0)
- self.assertEqual(self.driver._get_space_in_gb('123.0TB'), 123.0 * 1024)
- self.assertEqual(self.driver._get_space_in_gb('1024.0MB'), 1.0)
+ self.assertEqual(123.0, self.driver._get_space_in_gb('123.0GB'))
+ self.assertEqual(123.0 * 1024, self.driver._get_space_in_gb('123.0TB'))
+ self.assertEqual(1.0, self.driver._get_space_in_gb('1024.0MB'))
def test_get_output(self):
def _fake_recv(ignore_arg):
return '%s> ' % self.configuration.eqlx_group_name
- chan = self.mox.CreateMock(paramiko.Channel)
- self.stubs.Set(chan, "recv", _fake_recv)
- self.assertEqual(self.driver._get_output(chan), [_fake_recv(None)])
+ chan = mock.Mock(paramiko.Channel)
+ mock_recv = self.mock_object(chan, 'recv')
+ mock_recv.return_value = '%s> ' % self.configuration.eqlx_group_name
+ self.assertEqual([_fake_recv(None)], self.driver._get_output(chan))
def test_get_prefixed_value(self):
lines = ['Line1 passed', 'Line1 failed']
prefix = ['Line1', 'Line2']
expected_output = [' passed', None]
- self.assertEqual(self.driver._get_prefixed_value(lines, prefix[0]),
- expected_output[0])
- self.assertEqual(self.driver._get_prefixed_value(lines, prefix[1]),
- expected_output[1])
+ self.assertEqual(expected_output[0],
+ self.driver._get_prefixed_value(lines, prefix[0]))
+ self.assertEqual(expected_output[1],
+ self.driver._get_prefixed_value(lines, prefix[1]))
def test_ssh_execute(self):
- ssh = self.mox.CreateMock(paramiko.SSHClient)
- chan = self.mox.CreateMock(paramiko.Channel)
- transport = self.mox.CreateMock(paramiko.Transport)
- self.mox.StubOutWithMock(self.driver, '_get_output')
- self.mox.StubOutWithMock(chan, 'invoke_shell')
+ ssh = mock.Mock(paramiko.SSHClient)
+ chan = mock.Mock(paramiko.Channel)
+ transport = mock.Mock(paramiko.Transport)
+ mock_get_output = self.mock_object(self.driver, '_get_output')
+ self.mock_object(chan, 'invoke_shell')
expected_output = ['NoError: test run']
- ssh.get_transport().AndReturn(transport)
- transport.open_session().AndReturn(chan)
+ mock_get_output.return_value = expected_output
+ ssh.get_transport.return_value = transport
+ transport.open_session.return_value = chan
chan.invoke_shell()
- self.driver._get_output(chan).AndReturn(expected_output)
- cmd = 'this is dummy command'
chan.send('stty columns 255' + '\r')
- self.driver._get_output(chan).AndReturn(expected_output)
- chan.send(cmd + '\r')
- self.driver._get_output(chan).AndReturn(expected_output)
+ chan.send(self.cmd + '\r')
chan.close()
- self.mox.ReplayAll()
- self.assertEqual(self.driver._ssh_execute(ssh, cmd), expected_output)
+ self.assertEqual(expected_output,
+ self.driver._ssh_execute(ssh, self.cmd))
def test_ssh_execute_error(self):
- ssh = self.mox.CreateMock(paramiko.SSHClient)
- chan = self.mox.CreateMock(paramiko.Channel)
- transport = self.mox.CreateMock(paramiko.Transport)
- self.mox.StubOutWithMock(self.driver, '_get_output')
- self.mox.StubOutWithMock(ssh, 'get_transport')
- self.mox.StubOutWithMock(chan, 'invoke_shell')
+ ssh = mock.Mock(paramiko.SSHClient)
+ chan = mock.Mock(paramiko.Channel)
+ transport = mock.Mock(paramiko.Transport)
+ mock_get_output = self.mock_object(self.driver, '_get_output')
+ self.mock_object(ssh, 'get_transport')
+ self.mock_object(chan, 'invoke_shell')
expected_output = ['Error: test run', '% Error']
- ssh.get_transport().AndReturn(transport)
- transport.open_session().AndReturn(chan)
+ mock_get_output.return_value = expected_output
+ ssh.get_transport().return_value = transport
+ transport.open_session.return_value = chan
chan.invoke_shell()
- self.driver._get_output(chan).AndReturn(expected_output)
- cmd = 'this is dummy command'
chan.send('stty columns 255' + '\r')
- self.driver._get_output(chan).AndReturn(expected_output)
- chan.send(cmd + '\r')
- self.driver._get_output(chan).AndReturn(expected_output)
+ chan.send(self.cmd + '\r')
chan.close()
- self.mox.ReplayAll()
self.assertRaises(processutils.ProcessExecutionError,
- self.driver._ssh_execute, ssh, cmd)
+ self.driver._ssh_execute, ssh, self.cmd)
@mock.patch.object(greenthread, 'sleep')
def test_ensure_retries(self, _gt_sleep):
num_attempts = 3
self.driver.configuration.eqlx_cli_max_retries = num_attempts
-
self.mock_object(self.driver, '_ssh_execute',
mock.Mock(side_effect=exception.
VolumeBackendAPIException("some error")))