import ast
+from oslo.config import cfg
+
from cinder import context
from cinder import exception
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
HP3PAR_CPG = 'OpenStackCPG'
HP3PAR_CPG_SNAP = 'OpenStackCPGSnap'
HP3PAR_USER_NAME = 'testUser'
self.driver.do_setup(None)
return _m_client
+ @mock.patch('hp3parclient.version', "3.0.9")
+ def test_unsupported_client_version(self):
+
+ self.assertRaises(exception.InvalidInput,
+ self.setup_driver)
+
+ @mock.patch('hp3parclient.version', "3.1.0")
+ def test_ssh_options_310(self):
+
+ self.ctxt = context.get_admin_context()
+ mock_client = self.setup_mock_client(driver=hpfcdriver.HP3PARFCDriver)
+ expected = [
+ mock.call.setSSHOptions(
+ HP3PAR_SAN_IP,
+ HP3PAR_USER_NAME,
+ HP3PAR_USER_PASS,
+ privatekey=HP3PAR_SAN_SSH_PRIVATE,
+ port=HP3PAR_SAN_SSH_PORT,
+ conn_timeout=HP3PAR_SAN_SSH_CON_TIMEOUT),
+ mock.call.login(HP3PAR_USER_NAME, HP3PAR_USER_PASS),
+ mock.call.getCPG(HP3PAR_CPG),
+ mock.call.logout()]
+ mock_client.assert_has_calls(expected)
+
+ @mock.patch('hp3parclient.version', "3.1.1")
+ def test_ssh_options(self):
+
+ expected_hosts_key_file = "test_hosts_key_file"
+ orig_ssh_hosts_key_file = CONF.ssh_hosts_key_file
+ orig_strict_ssh_host_key_policy = CONF.strict_ssh_host_key_policy
+ CONF.ssh_hosts_key_file = expected_hosts_key_file
+ CONF.strict_ssh_host_key_policy = False
+
+ self.ctxt = context.get_admin_context()
+ mock_client = self.setup_mock_client(driver=hpfcdriver.HP3PARFCDriver)
+
+ CONF.ssh_hosts_key_file = orig_ssh_hosts_key_file
+ CONF.strict_ssh_host_key_policy = orig_strict_ssh_host_key_policy
+
+ expected = [
+ mock.call.setSSHOptions(
+ HP3PAR_SAN_IP,
+ HP3PAR_USER_NAME,
+ HP3PAR_USER_PASS,
+ privatekey=HP3PAR_SAN_SSH_PRIVATE,
+ known_hosts_file=expected_hosts_key_file,
+ missing_key_policy="AutoAddPolicy",
+ port=HP3PAR_SAN_SSH_PORT,
+ conn_timeout=HP3PAR_SAN_SSH_CON_TIMEOUT),
+ mock.call.login(HP3PAR_USER_NAME, HP3PAR_USER_PASS),
+ mock.call.getCPG(HP3PAR_CPG),
+ mock.call.logout()]
+ mock_client.assert_has_calls(expected)
+
+ @mock.patch('hp3parclient.version', "3.1.1")
+ def test_ssh_options_strict(self):
+
+ expected_hosts_key_file = "test_hosts_key_file"
+ orig_ssh_hosts_key_file = CONF.ssh_hosts_key_file
+ orig_strict_ssh_host_key_policy = CONF.strict_ssh_host_key_policy
+ CONF.ssh_hosts_key_file = expected_hosts_key_file
+ CONF.strict_ssh_host_key_policy = True
+
+ self.ctxt = context.get_admin_context()
+ mock_client = self.setup_mock_client(driver=hpfcdriver.HP3PARFCDriver)
+
+ CONF.ssh_hosts_key_file = orig_ssh_hosts_key_file
+ CONF.strict_ssh_host_key_policy = orig_strict_ssh_host_key_policy
+
+ expected = [
+ mock.call.setSSHOptions(
+ HP3PAR_SAN_IP,
+ HP3PAR_USER_NAME,
+ HP3PAR_USER_PASS,
+ privatekey=HP3PAR_SAN_SSH_PRIVATE,
+ known_hosts_file=expected_hosts_key_file,
+ missing_key_policy="RejectPolicy",
+ port=HP3PAR_SAN_SSH_PORT,
+ conn_timeout=HP3PAR_SAN_SSH_CON_TIMEOUT),
+ mock.call.login(HP3PAR_USER_NAME, HP3PAR_USER_PASS),
+ mock.call.getCPG(HP3PAR_CPG),
+ mock.call.logout()]
+ mock_client.assert_has_calls(expected)
+
def test_task_waiter(self):
task_statuses = [self.STATUS_ACTIVE, self.STATUS_ACTIVE]
LOG = logging.getLogger(__name__)
MIN_CLIENT_VERSION = '3.1.0'
+MIN_CLIENT_SSH_ARGS_VERSION = '3.1.1'
hp3par_opts = [
cfg.StrOpt('hp3par_api_url',
and hp3parclient 3.1.0
2.0.18 - HP 3PAR manage_existing with volume-type support
2.0.19 - Update default persona from Generic to Generic-ALUA
+ 2.0.20 - Configurable SSH missing key policy and known hosts file
"""
- VERSION = "2.0.19"
+ VERSION = "2.0.20"
stats = {}
LOG.error(ex_msg)
raise exception.InvalidInput(reason=ex_msg)
- cl.setSSHOptions(self.config.san_ip,
- self.config.san_login,
- self.config.san_password,
- port=self.config.san_ssh_port,
- conn_timeout=self.config.ssh_conn_timeout,
- privatekey=self.config.san_private_key)
+ if client_version < MIN_CLIENT_SSH_ARGS_VERSION:
+ cl.setSSHOptions(self.config.san_ip,
+ self.config.san_login,
+ self.config.san_password,
+ port=self.config.san_ssh_port,
+ conn_timeout=self.config.ssh_conn_timeout,
+ privatekey=self.config.san_private_key)
+ else:
+ known_hosts_file = CONF.ssh_hosts_key_file
+ policy = "AutoAddPolicy"
+ if CONF.strict_ssh_host_key_policy:
+ policy = "RejectPolicy"
+ cl.setSSHOptions(self.config.san_ip,
+ self.config.san_login,
+ self.config.san_password,
+ port=self.config.san_ssh_port,
+ conn_timeout=self.config.ssh_conn_timeout,
+ privatekey=self.config.san_private_key,
+ missing_key_policy=policy,
+ known_hosts_file=known_hosts_file)
return cl