]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Cancelling thread start while unit tests running
authorEvgeny Fedoruk <evgenyf@radware.com>
Tue, 18 Mar 2014 17:55:11 +0000 (10:55 -0700)
committerEvgeny Fedoruk <evgenyf@radware.com>
Fri, 28 Mar 2014 16:20:55 +0000 (09:20 -0700)
This change modifies the Radware driver and its unit testing code
to not start operations completion thread while unit tests are running.

The driver initialization changed not to start the operations completion thread,
the thread is started only when operation completion item is inserted into the queue
for the first time.
The operation completion functionality was moved to a new function which
is called by the operations completion thread run() function.
The run() function still have the functionality of popping operation completion
items out of the queue and push failed items back.

Unit testing code mocks the operation completion items queue
by calling the operations completion hanler new function when item
is added.

Start() and join() functions of the thread were mocked to do nothing.

All sleep() entrances were removed from the unit testing code.
All unnecessary mock_reset() calls were removed.

Change-Id: I72380bf223be690831aba1fc29c3dca910245516
Closes-Bug: #1245208

neutron/services/loadbalancer/drivers/radware/driver.py
neutron/tests/unit/services/loadbalancer/drivers/radware/test_plugin_driver.py

index dae5d144548009df6a6bbdcf120ba617eec17388..cb159f7c84b353c6b447212e89c59a55618f05df 100644 (file)
@@ -177,7 +177,7 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
                                                              plugin)
         self.workflow_templates_exists = False
         self.completion_handler.setDaemon(True)
-        self.completion_handler.start()
+        self.completion_handler_started = False
 
     def create_vip(self, context, vip):
         LOG.debug(_('create_vip. vip: %s'), str(vip))
@@ -340,6 +340,12 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
             context, extended_vip['subnet_id'])
         return subnet['network_id']
 
+    def _start_completion_handling_thread(self):
+        if not self.completion_handler_started:
+            LOG.info(_('Starting operation completion handling thread'))
+            self.completion_handler.start()
+            self.completion_handler_started = True
+
     @call_log.log
     def _update_workflow(self, wf_name, action,
                          wf_params, context,
@@ -371,6 +377,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
                                        entity_id,
                                        delete=delete)
             LOG.debug(_('Pushing operation %s to the queue'), oper)
+
+            self._start_completion_handling_thread()
             self.queue.put_nowait(oper)
 
     def _remove_workflow(self, ids, context):
@@ -391,6 +399,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
                                        ids['vip'],
                                        delete=True)
             LOG.debug(_('Pushing operation %s to the queue'), oper)
+
+            self._start_completion_handling_thread()
             self.queue.put_nowait(oper)
 
     def _remove_service(self, service_name):
@@ -619,24 +629,52 @@ class OperationCompletionHandler(threading.Thread):
         self.stoprequest = threading.Event()
         self.opers_to_handle_before_rest = 0
 
-    def _get_db_status(self, operation, success, messages=None):
-        """Get the db_status based on the status of the vdirect operation."""
-        if not success:
-            # we have a failure - log it and set the return ERROR as DB state
-            msg = ', '.join(messages) if messages else "unknown"
-            error_params = {"operation": operation, "msg": msg}
-            LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
-                      error_params)
-            return constants.ERROR
-        if operation.delete:
-            return None
-        else:
-            return constants.ACTIVE
-
     def join(self, timeout=None):
         self.stoprequest.set()
         super(OperationCompletionHandler, self).join(timeout)
 
+    def handle_operation_completion(self, oper):
+        result = self.rest_client.call('GET',
+                                       oper.operation_url,
+                                       None,
+                                       None)
+        completed = result[RESP_DATA]['complete']
+        reason = result[RESP_REASON],
+        description = result[RESP_STR]
+        if completed:
+            # operation is done - update the DB with the status
+            # or delete the entire graph from DB
+            success = result[RESP_DATA]['success']
+            sec_to_completion = time.time() - oper.creation_time
+            debug_data = {'oper': oper,
+                          'sec_to_completion': sec_to_completion,
+                          'success': success}
+            LOG.debug(_('Operation %(oper)s is completed after '
+                      '%(sec_to_completion)d sec '
+                      'with success status: %(success)s :'),
+                      debug_data)
+            db_status = None
+            if not success:
+                # failure - log it and set the return ERROR as DB state
+                if reason or description:
+                    msg = 'Reason:%s. Description:%s' % (reason, description)
+                else:
+                    msg = "unknown"
+                error_params = {"operation": oper, "msg": msg}
+                LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
+                          error_params)
+                db_status = constants.ERROR
+            else:
+                if oper.delete:
+                    _remove_object_from_db(self.plugin, oper)
+                else:
+                    db_status = constants.ACTIVE
+
+            if db_status:
+                _update_vip_graph_status(self.plugin, oper, db_status)
+
+        return completed
+
     def run(self):
         oper = None
         while not self.stoprequest.isSet():
@@ -653,31 +691,7 @@ class OperationCompletionHandler(threading.Thread):
                           str(oper))
                 # check the status - if oper is done: update the db ,
                 # else push the oper again to the queue
-                result = self.rest_client.call('GET',
-                                               oper.operation_url,
-                                               None,
-                                               None)
-                completed = result[RESP_DATA]['complete']
-                if completed:
-                    # operation is done - update the DB with the status
-                    # or delete the entire graph from DB
-                    success = result[RESP_DATA]['success']
-                    sec_to_completion = time.time() - oper.creation_time
-                    debug_data = {'oper': oper,
-                                  'sec_to_completion': sec_to_completion,
-                                  'success': success}
-                    LOG.debug(_('Operation %(oper)s is completed after '
-                              '%(sec_to_completion)d sec '
-                              'with success status: %(success)s :'),
-                              debug_data)
-                    db_status = self._get_db_status(oper, success)
-                    if db_status:
-                        _update_vip_graph_status(
-                            self.plugin, oper, db_status)
-                    else:
-                        _remove_object_from_db(
-                            self.plugin, oper)
-                else:
+                if not self.handle_operation_completion(oper):
                     LOG.debug(_('Operation %s is not completed yet..') % oper)
                     # Not completed - push to the queue again
                     self.queue.put_nowait(oper)
index 4d244406262eb7ba5375c1c4a4b2600d9a0b51a1..ffbb9866b4833b9966d4f1f40618ee26f9943084 100644 (file)
 #
 # @author: Avishay Balderman, Radware
 
+import Queue
 import re
 
 import contextlib
-import eventlet
 import mock
 
 from neutron import context
@@ -34,8 +34,16 @@ from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
 GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
 
 
-def rest_call_function_mock(action, resource, data, headers, binary=False):
+class QueueMock(Queue.Queue):
+    def __init__(self, completion_handler):
+        self.completion_handler = completion_handler
+        super(QueueMock, self).__init__()
+
+    def put_nowait(self, oper):
+        self.completion_handler(oper)
+
 
+def rest_call_function_mock(action, resource, data, headers, binary=False):
     if rest_call_function_mock.RESPOND_WITH_ERROR:
         return 400, 'error_status', 'error_description', None
 
@@ -107,13 +115,24 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
         rest_call_function_mock.__dict__.update(
             {'TEMPLATES_MISSING': False})
 
-        self.rest_call_mock = mock.Mock(name='rest_call_mock',
-                                        side_effect=rest_call_function_mock,
-                                        spec=self.plugin_instance.
-                                        drivers['radware'].
-                                        rest_client.call)
+        self.operation_completer_start_mock = mock.Mock(
+            return_value=None)
+        self.operation_completer_join_mock = mock.Mock(
+            return_value=None)
+        self.driver_rest_call_mock = mock.Mock(
+            side_effect=rest_call_function_mock)
+
         radware_driver = self.plugin_instance.drivers['radware']
-        radware_driver.rest_client.call = self.rest_call_mock
+        radware_driver.completion_handler.start = (
+            self.operation_completer_start_mock)
+        radware_driver.completion_handler.join = (
+            self.operation_completer_join_mock)
+        radware_driver.rest_client.call = self.driver_rest_call_mock
+        radware_driver.completion_handler.rest_client.call = (
+            self.driver_rest_call_mock)
+
+        radware_driver.queue = QueueMock(
+            radware_driver.completion_handler.handle_operation_completion)
 
         self.addCleanup(radware_driver.completion_handler.join)
 
@@ -128,7 +147,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
 
     def test_create_vip_failure(self):
         """Test the rest call failure handling by Exception raising."""
-        self.rest_call_mock.reset_mock()
         with self.network(do_delete=False) as network:
             with self.subnet(network=network, do_delete=False) as subnet:
                 with self.pool(no_delete=True, provider='radware') as pool:
@@ -155,9 +173,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                       {'vip': vip_data})
 
     def test_create_vip(self):
-        self.skipTest("Skipping test till bug 1288312 is fixed")
-
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.pool(provider='radware') as pool:
                 vip_data = {
@@ -211,10 +226,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                     mock.call('GET', '/api/workflow/' +
                               pool['pool']['id'], None, None)
                 ]
-                self.rest_call_mock.assert_has_calls(calls, any_order=True)
-
-                # sleep to wait for the operation completion
-                eventlet.greenthread.sleep(0)
+                self.driver_rest_call_mock.assert_has_calls(calls,
+                                                            any_order=True)
 
                 #Test DB
                 new_vip = self.plugin_instance.get_vip(
@@ -232,12 +245,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                     mock.call('DELETE', u'/api/workflow/' + pool['pool']['id'],
                               None, None)
                 ]
-                self.rest_call_mock.assert_has_calls(calls, any_order=True)
-                # need to wait some time to allow driver to delete vip
-                eventlet.greenthread.sleep(1)
+                self.driver_rest_call_mock.assert_has_calls(
+                    calls, any_order=True)
 
     def test_update_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.pool(provider='radware', no_delete=True) as pool:
                 vip_data = {
@@ -268,15 +279,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                               '/action/BaseCreate',
                               mock.ANY, driver.TEMPLATE_HEADER),
                 ]
-                self.rest_call_mock.assert_has_calls(calls, any_order=True)
-
-                updated_vip = self.plugin_instance.get_vip(
-                    context.get_admin_context(), vip['id'])
-                self.assertEqual(updated_vip['status'],
-                                 constants.PENDING_UPDATE)
+                self.driver_rest_call_mock.assert_has_calls(
+                    calls, any_order=True)
 
-                # sleep to wait for the operation completion
-                eventlet.greenthread.sleep(1)
                 updated_vip = self.plugin_instance.get_vip(
                     context.get_admin_context(), vip['id'])
                 self.assertEqual(updated_vip['status'], constants.ACTIVE)
@@ -286,7 +291,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                     context.get_admin_context(), vip['id'])
 
     def test_delete_vip_failure(self):
-        self.rest_call_mock.reset_mock()
         plugin = self.plugin_instance
 
         with self.network(do_delete=False) as network:
@@ -306,8 +310,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                             context.get_admin_context(), hm, pool['pool']['id']
                         )
 
-                        eventlet.greenthread.sleep(1)
-
                         rest_call_function_mock.__dict__.update(
                             {'RESPOND_WITH_ERROR': True})
 
@@ -333,7 +335,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                         self.assertEqual(u_phm['status'], constants.ACTIVE)
 
     def test_delete_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.pool(provider='radware', no_delete=True) as pool:
                 vip_data = {
@@ -360,14 +361,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                     mock.call('DELETE', '/api/workflow/' + pool['pool']['id'],
                               None, None)
                 ]
-                self.rest_call_mock.assert_has_calls(calls, any_order=True)
+                self.driver_rest_call_mock.assert_has_calls(
+                    calls, any_order=True)
 
                 self.assertRaises(loadbalancer.VipNotFound,
                                   self.plugin_instance.get_vip,
                                   context.get_admin_context(), vip['id'])
 
     def test_update_pool(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet():
             with self.pool() as pool:
                 del pool['pool']['provider']
@@ -380,7 +381,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                 self.assertEqual(pool_db['status'], constants.PENDING_UPDATE)
 
     def test_delete_pool_with_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.pool(provider='radware', no_delete=True) as pool:
                 with self.vip(pool=pool, subnet=subnet):
@@ -390,7 +390,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                       pool['pool']['id'])
 
     def test_create_member_with_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.pool(provider='radware') as p:
                 with self.vip(pool=p, subnet=subnet):
@@ -407,11 +406,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                 mock.ANY, driver.TEMPLATE_HEADER
                             )
                         ]
-                        self.rest_call_mock.assert_has_calls(calls,
-                                                             any_order=True)
+                        self.driver_rest_call_mock.assert_has_calls(
+                            calls, any_order=True)
 
     def test_update_member_with_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.pool(provider='radware') as p:
                 with self.member(pool_id=p['pool']['id']) as member:
@@ -432,16 +430,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                 mock.ANY, driver.TEMPLATE_HEADER
                             )
                         ]
-                        self.rest_call_mock.assert_has_calls(calls,
-                                                             any_order=True)
+                        self.driver_rest_call_mock.assert_has_calls(
+                            calls, any_order=True)
 
                         updated_member = self.plugin_instance.get_member(
                             context.get_admin_context(),
                             member['member']['id']
                         )
 
-                        # sleep to wait for the operation completion
-                        eventlet.greenthread.sleep(0)
                         updated_member = self.plugin_instance.get_member(
                             context.get_admin_context(),
                             member['member']['id']
@@ -450,7 +446,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                          constants.ACTIVE)
 
     def test_update_member_without_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet():
             with self.pool(provider='radware') as pool:
                 with self.member(pool_id=pool['pool']['id']) as member:
@@ -463,7 +458,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                      constants.PENDING_UPDATE)
 
     def test_delete_member_with_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.pool(provider='radware') as p:
                 with self.member(pool_id=p['pool']['id'],
@@ -474,21 +468,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                         # wait for being sure the member
                         # Changed status from PENDING-CREATE
                         # to ACTIVE
-                        self.rest_call_mock.reset_mock()
-                        eventlet.greenthread.sleep(1)
 
                         self.plugin_instance.delete_member(
                             context.get_admin_context(),
                             m['member']['id']
                         )
 
-                        args, kwargs = self.rest_call_mock.call_args
+                        name, args, kwargs = (
+                            self.driver_rest_call_mock.mock_calls[-2]
+                        )
                         deletion_post_graph = str(args[2])
 
                         self.assertTrue(re.search(
                             r'.*\'member_address_array\': \[\].*',
                             deletion_post_graph
                         ))
+
                         calls = [
                             mock.call(
                                 'POST', '/api/workflow/' + p['pool']['id'] +
@@ -496,17 +491,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                 mock.ANY, driver.TEMPLATE_HEADER
                             )
                         ]
-                        self.rest_call_mock.assert_has_calls(
+                        self.driver_rest_call_mock.assert_has_calls(
                             calls, any_order=True)
 
-                        eventlet.greenthread.sleep(1)
                         self.assertRaises(loadbalancer.MemberNotFound,
                                           self.plugin_instance.get_member,
                                           context.get_admin_context(),
                                           m['member']['id'])
 
     def test_delete_member_without_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet():
             with self.pool(provider='radware') as p:
                 with self.member(pool_id=p['pool']['id'], no_delete=True) as m:
@@ -519,7 +512,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                       m['member']['id'])
 
     def test_create_hm_with_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.health_monitor() as hm:
                 with self.pool(provider='radware') as pool:
@@ -543,11 +535,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                 mock.ANY, driver.TEMPLATE_HEADER
                             )
                         ]
-                        self.rest_call_mock.assert_has_calls(
+                        self.driver_rest_call_mock.assert_has_calls(
                             calls, any_order=True)
 
-                        eventlet.greenthread.sleep(1)
-
                         phm = self.plugin_instance.get_pool_health_monitor(
                             context.get_admin_context(),
                             hm['health_monitor']['id'], pool['pool']['id']
@@ -555,7 +545,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                         self.assertEqual(phm['status'], constants.ACTIVE)
 
     def test_delete_pool_hm_with_vip(self):
-        self.rest_call_mock.reset_mock()
         with self.subnet() as subnet:
             with self.health_monitor(no_delete=True) as hm:
                 with self.pool(provider='radware') as pool:
@@ -565,21 +554,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                             hm, pool['pool']['id']
                         )
 
-                        # Reset mock and
-                        # wait for being sure that status
-                        # changed from PENDING-CREATE
-                        # to ACTIVE
-                        self.rest_call_mock.reset_mock()
-                        eventlet.greenthread.sleep(1)
-
                         self.plugin_instance.delete_pool_health_monitor(
                             context.get_admin_context(),
                             hm['health_monitor']['id'],
                             pool['pool']['id']
                         )
 
-                        eventlet.greenthread.sleep(1)
-                        name, args, kwargs = self.rest_call_mock.mock_calls[-2]
+                        name, args, kwargs = (
+                            self.driver_rest_call_mock.mock_calls[-2]
+                        )
                         deletion_post_graph = str(args[2])
 
                         self.assertTrue(re.search(
@@ -594,7 +577,7 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
                                 mock.ANY, driver.TEMPLATE_HEADER
                             )
                         ]
-                        self.rest_call_mock.assert_has_calls(
+                        self.driver_rest_call_mock.assert_has_calls(
                             calls, any_order=True)
 
                         self.assertRaises(