]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Radware LBaaS driver is able to flip to a secondary backend node
authorAvishay Balderman <avishayb@radware.com>
Thu, 29 May 2014 11:22:21 +0000 (14:22 +0300)
committerAvishay Balderman <avishayb@radware.com>
Sun, 1 Jun 2014 13:37:18 +0000 (16:37 +0300)
Change-Id: Ifbfef493d5339f61dcf58dddcc8e3830aaf06bf1
Closes-Bug: #1324131

etc/services.conf
neutron/services/loadbalancer/drivers/radware/driver.py
neutron/tests/unit/services/loadbalancer/drivers/radware/test_plugin_driver.py

index 3a4b538f1b560f2853ab3ce40798e2d3154dc600..f8a6090055241d6ba88fd5f3b6e3fb2865c44dea 100644 (file)
@@ -1,5 +1,6 @@
 [radware]
 #vdirect_address = 0.0.0.0
+#ha_secondary_address=
 #vdirect_user = vDirect
 #vdirect_password = radware
 #service_ha_pair = False
index 0f3d195a862c8e59c84e0bb4cb40918cd3080d81..7f1942562dbfb5bd24757ee50b6bab458ac61ef8 100644 (file)
@@ -20,10 +20,10 @@ import base64
 import copy
 import httplib
 import Queue
-import socket
 import threading
 import time
 
+
 import eventlet
 from oslo.config import cfg
 
@@ -61,6 +61,8 @@ CREATE_SERVICE_HEADER = {'Content-Type':
 driver_opts = [
     cfg.StrOpt('vdirect_address',
                help=_('IP address of vDirect server.')),
+    cfg.StrOpt('ha_secondary_address',
+               help=_('IP address of secondary vDirect server.')),
     cfg.StrOpt('vdirect_user',
                default='vDirect',
                help=_('vDirect user name.')),
@@ -173,8 +175,10 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
         self.l2_l3_setup_params = rad.l2_l3_setup_params
         self.l4_action_name = rad.l4_action_name
         self.actions_to_skip = rad.actions_to_skip
-        vdirect_address = cfg.CONF.radware.vdirect_address
+        vdirect_address = rad.vdirect_address
+        sec_server = rad.ha_secondary_address
         self.rest_client = vDirectRESTClient(server=vdirect_address,
+                                             secondary_server=sec_server,
                                              user=rad.vdirect_user,
                                              password=rad.vdirect_password)
         self.queue = Queue.Queue()
@@ -633,6 +637,7 @@ class vDirectRESTClient:
 
     def __init__(self,
                  server='localhost',
+                 secondary_server=None,
                  user=None,
                  password=None,
                  port=2189,
@@ -640,6 +645,7 @@ class vDirectRESTClient:
                  timeout=5000,
                  base_uri=''):
         self.server = server
+        self.secondary_server = secondary_server
         self.port = port
         self.ssl = ssl
         self.base_uri = base_uri
@@ -651,14 +657,48 @@ class vDirectRESTClient:
             raise r_exc.AuthenticationMissing()
 
         debug_params = {'server': self.server,
+                        'sec_server': self.secondary_server,
                         'port': self.port,
                         'ssl': self.ssl}
         LOG.debug(_('vDirectRESTClient:init server=%(server)s, '
-                  'port=%(port)d, '
-                  'ssl=%(ssl)r'), debug_params)
+                    'secondary server=%(sec_server)s, '
+                    'port=%(port)d, '
+                    'ssl=%(ssl)r'), debug_params)
+
+    def _flip_servers(self):
+        LOG.warning(_('Fliping servers. Current is: %(server)s, '
+                      'switching to %(secondary)s'),
+                    {'server': self.server,
+                     'secondary': self.secondary_server})
+        self.server, self.secondary_server = self.secondary_server, self.server
+
+    def _recover(self, action, resource, data, headers, binary=False):
+        if self.server and self.secondary_server:
+            self._flip_servers()
+            resp = self._call(action, resource, data,
+                              headers, binary)
+            return resp
+        else:
+            LOG.exception(_('REST client is not able to recover '
+                            'since only one vDirect server is '
+                            'configured.'))
+            return -1, None, None, None
 
-    @call_log.log
     def call(self, action, resource, data, headers, binary=False):
+        resp = self._call(action, resource, data, headers, binary)
+        if resp[RESP_STATUS] == -1:
+            LOG.warning(_('vDirect server is not responding (%s).'),
+                        self.server)
+            return self._recover(action, resource, data, headers, binary)
+        elif resp[RESP_STATUS] in (301, 307):
+            LOG.warning(_('vDirect server is not active (%s).'),
+                        self.server)
+            return self._recover(action, resource, data, headers, binary)
+        else:
+            return resp
+
+    @call_log.log
+    def _call(self, action, resource, data, headers, binary=False):
         if resource.startswith('http'):
             uri = resource
         else:
@@ -701,11 +741,11 @@ class vDirectRESTClient:
                 # response was not JSON, ignore the exception
                 pass
             ret = (response.status, response.reason, respstr, respdata)
-        except (socket.timeout, socket.error) as e:
+        except Exception as e:
             log_dict = {'action': action, 'e': e}
             LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'),
                       log_dict)
-            ret = 0, None, None, None
+            ret = -1, None, None, None
         conn.close()
         return ret
 
@@ -853,7 +893,14 @@ class OperationCompletionHandler(threading.Thread):
 
 def _rest_wrapper(response, success_codes=[202]):
     """Wrap a REST call and make sure a valid status is returned."""
-    if response[RESP_STATUS] not in success_codes:
+    if not response:
+        raise r_exc.RESTRequestFailure(
+            status=-1,
+            reason="Unknown",
+            description="Unknown",
+            success_codes=success_codes
+        )
+    elif response[RESP_STATUS] not in success_codes:
         raise r_exc.RESTRequestFailure(
             status=response[RESP_STATUS],
             reason=response[RESP_REASON],
index d8302495f9c306e43c3b606ef5e4e3ac60e32c78..25900e9a835cbf5aec034c89b14dc1c41e12f05c 100644 (file)
@@ -32,6 +32,7 @@ from neutron.services.loadbalancer.drivers.radware import exceptions as r_exc
 from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
 
 GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
+SERVER_DOWN_CODES = (-1, 301, 307)
 
 
 class QueueMock(Queue.Queue):
@@ -43,10 +44,16 @@ class QueueMock(Queue.Queue):
         self.completion_handler(oper)
 
 
+def _recover_function_mock(action, resource, data, headers, binary=False):
+    pass
+
+
 def rest_call_function_mock(action, resource, data, headers, binary=False):
     if rest_call_function_mock.RESPOND_WITH_ERROR:
         return 400, 'error_status', 'error_description', None
-
+    if rest_call_function_mock.RESPOND_WITH_SERVER_DOWN in SERVER_DOWN_CODES:
+        val = rest_call_function_mock.RESPOND_WITH_SERVER_DOWN
+        return val, 'error_status', 'error_description', None
     if action == 'GET':
         return _get_handler(resource)
     elif action == 'DELETE':
@@ -114,6 +121,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
             {'RESPOND_WITH_ERROR': False})
         rest_call_function_mock.__dict__.update(
             {'TEMPLATES_MISSING': False})
+        rest_call_function_mock.__dict__.update(
+            {'RESPOND_WITH_SERVER_DOWN': 200})
 
         self.operation_completer_start_mock = mock.Mock(
             return_value=None)
@@ -121,13 +130,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
             return_value=None)
         self.driver_rest_call_mock = mock.Mock(
             side_effect=rest_call_function_mock)
+        self.flip_servers_mock = mock.Mock(
+            return_value=None)
+        self.recover_mock = mock.Mock(
+            side_effect=_recover_function_mock)
 
         radware_driver = self.plugin_instance.drivers['radware']
         radware_driver.completion_handler.start = (
             self.operation_completer_start_mock)
         radware_driver.completion_handler.join = (
             self.operation_completer_join_mock)
+        self.orig_call = radware_driver.rest_client.call
+        self.orig__call = radware_driver.rest_client._call
         radware_driver.rest_client.call = self.driver_rest_call_mock
+        radware_driver.rest_client._call = self.driver_rest_call_mock
+        radware_driver.rest_client._flip_servers = self.flip_servers_mock
+        radware_driver.rest_client._recover = self.recover_mock
         radware_driver.completion_handler.rest_client.call = (
             self.driver_rest_call_mock)
 
@@ -136,6 +154,34 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
 
         self.addCleanup(radware_driver.completion_handler.join)
 
+    def test_rest_client_recover_was_called(self):
+        """Call the real REST client and verify _recover is called."""
+        radware_driver = self.plugin_instance.drivers['radware']
+        radware_driver.rest_client.call = self.orig_call
+        radware_driver.rest_client._call = self.orig__call
+        self.assertRaises(r_exc.RESTRequestFailure,
+                          radware_driver._verify_workflow_templates)
+        self.recover_mock.assert_called_once()
+
+    def test_rest_client_flip_servers(self):
+        radware_driver = self.plugin_instance.drivers['radware']
+        server = radware_driver.rest_client.server
+        sec_server = radware_driver.rest_client.secondary_server
+        radware_driver.rest_client._flip_servers()
+        self.assertEqual(server,
+                         radware_driver.rest_client.secondary_server)
+        self.assertEqual(sec_server,
+                         radware_driver.rest_client.server)
+
+    def test_verify_workflow_templates_server_down(self):
+        """Test the rest call failure when backend is down."""
+        for value in SERVER_DOWN_CODES:
+            rest_call_function_mock.__dict__.update(
+                {'RESPOND_WITH_SERVER_DOWN': value})
+            self.assertRaises(r_exc.RESTRequestFailure,
+                              self.plugin_instance.drivers['radware'].
+                              _verify_workflow_templates)
+
     def test_verify_workflow_templates(self):
         """Test the rest call failure handling by Exception raising."""
         rest_call_function_mock.__dict__.update(