'initiator_port_wwn_list': ['10008c7cff523b01'],
'target_port_wwn_list': ['20240002ac000a50']}}
-CONF = cfg.CONF
-
class TestBrcdFCSanLookupService(brcd_lookup.BrcdFCSanLookupService,
test.TestCase):
@mock.patch.object(paramiko.hostkeys.HostKeys, 'load')
def test_create_ssh_client(self, load_mock):
- CONF.ssh_hosts_key_file = 'dummy_host_key_file'
- CONF.strict_ssh_host_key_policy = True
- ssh_client = self.create_ssh_client()
+ mock_args = {}
+ mock_args['known_hosts_file'] = 'dummy_host_key_file'
+ mock_args['missing_key_policy'] = paramiko.RejectPolicy()
+ ssh_client = self.create_ssh_client(**mock_args)
self.assertEqual(ssh_client._host_keys_filename, 'dummy_host_key_file')
self.assertTrue(isinstance(ssh_client._policy, paramiko.RejectPolicy))
- CONF.strict_ssh_host_key_policy = False
- ssh_client = self.create_ssh_client()
- self.assertTrue(isinstance(ssh_client._policy, paramiko.AutoAddPolicy))
+ mock_args = {}
+ ssh_client = self.create_ssh_client(**mock_args)
+ self.assertIsNone(ssh_client._host_keys_filename)
+ self.assertTrue(isinstance(ssh_client._policy, paramiko.WarningPolicy))
@mock.patch.object(brcd_lookup.BrcdFCSanLookupService,
'get_nameserver_info')
#
-from oslo.config import cfg
import paramiko
from cinder import exception
LOG = logging.getLogger(__name__)
-CONF = cfg.CONF
-
class BrcdFCSanLookupService(FCSanLookupService):
"""The SAN lookup service that talks to Brocade switches.
super(BrcdFCSanLookupService, self).__init__(**kwargs)
self.configuration = kwargs.get('configuration', None)
self.create_configuration()
- self.client = self.create_ssh_client()
+ self.client = self.create_ssh_client(**kwargs)
def create_configuration(self):
"""Configuration specific to SAN context values."""
self.fabric_configs = fabric_opts.load_fabric_configurations(
fabric_names)
- def create_ssh_client(self):
+ def create_ssh_client(self, **kwargs):
ssh_client = paramiko.SSHClient()
- known_hosts_file = CONF.ssh_hosts_key_file
- if not known_hosts_file:
- raise exception.ParameterNotFound(param='ssh_hosts_key_file')
- ssh_client.load_host_keys(known_hosts_file)
- if CONF.strict_ssh_host_key_policy:
- missing_key_policy = paramiko.RejectPolicy()
+ known_hosts_file = kwargs.get('known_hosts_file', None)
+ if known_hosts_file is None:
+ ssh_client.load_system_host_keys()
else:
- missing_key_policy = paramiko.AutoAddPolicy()
+ ssh_client.load_host_keys(known_hosts_file)
+ missing_key_policy = kwargs.get('missing_key_policy', None)
+ if missing_key_policy is None:
+ missing_key_policy = paramiko.WarningPolicy()
ssh_client.set_missing_host_key_policy(missing_key_policy)
return ssh_client