import os.path
import string
+import time
from cinder.brick import exception
from cinder.brick.initiator import connector
def test_disconnect_volume(self):
self.connector = connector.InitiatorConnector()
self.assertRaises(NotImplementedError,
- self.connector.connect_volume, None)
+ self.connector.disconnect_volume, None, None)
def test_factory(self):
obj = connector.InitiatorConnector.factory('iscsi')
connector.InitiatorConnector.factory,
"bogus")
+ def test_check_valid_device_with_wrong_path(self):
+ self.connector = connector.InitiatorConnector()
+ self.stubs.Set(self.connector,
+ '_execute', lambda *args, **kwargs: ("", None))
+ self.assertFalse(self.connector.check_valid_device('/d0v'))
+
+ def test_check_valid_device(self):
+ self.connector = connector.InitiatorConnector()
+ self.stubs.Set(self.connector,
+ '_execute', lambda *args, **kwargs: ("", ""))
+ self.assertTrue(self.connector.check_valid_device('/dev'))
+
+ def test_check_valid_device_with_cmd_error(self):
+ def raise_except(*args, **kwargs):
+ raise putils.ProcessExecutionError
+ self.connector = connector.InitiatorConnector()
+ self.stubs.Set(self.connector,
+ '_execute', raise_except)
+ self.assertFalse(self.connector.check_valid_device('/dev'))
+
class HostDriverTestCase(test.TestCase):
self.assertEqual(expected_commands, self.cmds)
+ def test_connect_volume_with_multipath(self):
+
+ location = '10.0.2.15:3260'
+ name = 'volume-00000001'
+ iqn = 'iqn.2010-10.org.openstack:%s' % name
+ vol = {'id': 1, 'name': name}
+ connection_properties = self.iscsi_connection(vol, location, iqn)
+
+ self.connector_with_multipath =\
+ connector.ISCSIConnector(use_multipath=True)
+ self.stubs.Set(self.connector_with_multipath,
+ '_run_iscsiadm_bare',
+ lambda *args, **kwargs: "%s %s" % (location, iqn))
+ self.stubs.Set(self.connector_with_multipath,
+ '_get_target_portals_from_iscsiadm_output',
+ lambda x: [location])
+ self.stubs.Set(self.connector_with_multipath,
+ '_connect_to_iscsi_portal',
+ lambda x: None)
+ self.stubs.Set(self.connector_with_multipath,
+ '_rescan_iscsi',
+ lambda: None)
+ self.stubs.Set(self.connector_with_multipath,
+ '_rescan_multipath',
+ lambda: None)
+ self.stubs.Set(self.connector_with_multipath,
+ '_get_multipath_device_name',
+ lambda x: 'iqn.2010-10.org.openstack:%s' % name)
+ self.stubs.Set(os.path, 'exists', lambda x: True)
+ result = self.connector_with_multipath.connect_volume(
+ connection_properties['data'])
+ expected_result = {'path': 'iqn.2010-10.org.openstack:volume-00000001',
+ 'type': 'block'}
+ self.assertEqual(result, expected_result)
+
+ def test_connect_volume_with_not_found_device(self):
+ self.stubs.Set(os.path, 'exists', lambda x: False)
+ self.stubs.Set(time, 'sleep', lambda x: None)
+ location = '10.0.2.15:3260'
+ name = 'volume-00000001'
+ iqn = 'iqn.2010-10.org.openstack:%s' % name
+ vol = {'id': 1, 'name': name}
+ connection_info = self.iscsi_connection(vol, location, iqn)
+ self.assertRaises(exception.VolumeDeviceNotFound,
+ self.connector.connect_volume,
+ connection_info['data'])
+
class FibreChannelConnectorTestCase(ConnectorTestCase):
def setUp(self):