combined_cert = None
def connect(self):
- sock = socket.create_connection((self.host, self.port),
- self.timeout, self.source_address)
+ try:
+ sock = socket.create_connection((self.host, self.port),
+ self.timeout, self.source_address)
+ except AttributeError:
+ # python 2.6 doesn't have the source_address attribute
+ sock = socket.create_connection((self.host, self.port),
+ self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
from oslo.config import cfg
from neutron.manager import NeutronManager
+from neutron.openstack.common import importutils
from neutron.plugins.bigswitch import servermanager
from neutron.tests.unit.bigswitch import test_restproxy_plugin as test_rp
conmock.return_value.request.side_effect = socket.timeout()
resp = sp.servers[0].rest_call('GET', '/')
self.assertEqual(resp, (0, None, None, None))
+
+
+class TestSockets(test_rp.BigSwitchProxyPluginV2TestCase):
+
+ def setUp(self):
+ super(TestSockets, self).setUp()
+ # http patch must not be running or it will mangle the servermanager
+ # import where the https connection classes are defined
+ self.httpPatch.stop()
+ self.sm = importutils.import_module(SERVERMANAGER)
+
+ def test_socket_create_attempt(self):
+ # exercise the socket creation to make sure it works on both python
+ # versions
+ con = self.sm.HTTPSConnectionWithValidation('127.0.0.1', 0, timeout=1)
+ # if httpcon was created, a connect attempt should raise a socket error
+ self.assertRaises(socket.error, con.connect)