'initiator_port_wwn_list': ['10008c7cff523b01'],
'target_port_wwn_list': ['20240002ac000a50']}}
+CONF = cfg.CONF
+
class TestBrcdFCSanLookupService(brcd_lookup.BrcdFCSanLookupService,
test.TestCase):
@mock.patch.object(paramiko.hostkeys.HostKeys, 'load')
def test_create_ssh_client(self, load_mock):
- mock_args = {}
- mock_args['known_hosts_file'] = 'dummy_host_key_file'
- mock_args['missing_key_policy'] = paramiko.RejectPolicy()
- ssh_client = self.create_ssh_client(**mock_args)
+ CONF.ssh_hosts_key_file = 'dummy_host_key_file'
+ CONF.strict_ssh_host_key_policy = True
+ ssh_client = self.create_ssh_client()
self.assertEqual(ssh_client._host_keys_filename, 'dummy_host_key_file')
self.assertTrue(isinstance(ssh_client._policy, paramiko.RejectPolicy))
- mock_args = {}
- ssh_client = self.create_ssh_client(**mock_args)
- self.assertIsNone(ssh_client._host_keys_filename)
- self.assertTrue(isinstance(ssh_client._policy, paramiko.WarningPolicy))
+ CONF.strict_ssh_host_key_policy = False
+ ssh_client = self.create_ssh_client()
+ self.assertTrue(isinstance(ssh_client._policy, paramiko.AutoAddPolicy))
@mock.patch.object(brcd_lookup.BrcdFCSanLookupService,
'get_nameserver_info')
#
+from oslo.config import cfg
import paramiko
from cinder import exception
LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
class BrcdFCSanLookupService(FCSanLookupService):
"""The SAN lookup service that talks to Brocade switches.
super(BrcdFCSanLookupService, self).__init__(**kwargs)
self.configuration = kwargs.get('configuration', None)
self.create_configuration()
- self.client = self.create_ssh_client(**kwargs)
+ self.client = self.create_ssh_client()
def create_configuration(self):
"""Configuration specific to SAN context values."""
self.fabric_configs = fabric_opts.load_fabric_configurations(
fabric_names)
- def create_ssh_client(self, **kwargs):
+ def create_ssh_client(self):
ssh_client = paramiko.SSHClient()
- known_hosts_file = kwargs.get('known_hosts_file', None)
- if known_hosts_file is None:
- ssh_client.load_system_host_keys()
+ known_hosts_file = CONF.ssh_hosts_key_file
+ if not known_hosts_file:
+ raise exception.ParameterNotFound(param='ssh_hosts_key_file')
+ ssh_client.load_host_keys(known_hosts_file)
+ if CONF.strict_ssh_host_key_policy:
+ missing_key_policy = paramiko.RejectPolicy()
else:
- ssh_client.load_host_keys(known_hosts_file)
- missing_key_policy = kwargs.get('missing_key_policy', None)
- if missing_key_policy is None:
- missing_key_policy = paramiko.WarningPolicy()
+ missing_key_policy = paramiko.AutoAddPolicy()
ssh_client.set_missing_host_key_policy(missing_key_policy)
return ssh_client