plugin)
self.workflow_templates_exists = False
self.completion_handler.setDaemon(True)
- self.completion_handler.start()
+ self.completion_handler_started = False
def create_vip(self, context, vip):
LOG.debug(_('create_vip. vip: %s'), str(vip))
context, extended_vip['subnet_id'])
return subnet['network_id']
+ def _start_completion_handling_thread(self):
+ if not self.completion_handler_started:
+ LOG.info(_('Starting operation completion handling thread'))
+ self.completion_handler.start()
+ self.completion_handler_started = True
+
@call_log.log
def _update_workflow(self, wf_name, action,
wf_params, context,
entity_id,
delete=delete)
LOG.debug(_('Pushing operation %s to the queue'), oper)
+
+ self._start_completion_handling_thread()
self.queue.put_nowait(oper)
def _remove_workflow(self, ids, context):
ids['vip'],
delete=True)
LOG.debug(_('Pushing operation %s to the queue'), oper)
+
+ self._start_completion_handling_thread()
self.queue.put_nowait(oper)
def _remove_service(self, service_name):
self.stoprequest = threading.Event()
self.opers_to_handle_before_rest = 0
- def _get_db_status(self, operation, success, messages=None):
- """Get the db_status based on the status of the vdirect operation."""
- if not success:
- # we have a failure - log it and set the return ERROR as DB state
- msg = ', '.join(messages) if messages else "unknown"
- error_params = {"operation": operation, "msg": msg}
- LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
- error_params)
- return constants.ERROR
- if operation.delete:
- return None
- else:
- return constants.ACTIVE
-
def join(self, timeout=None):
self.stoprequest.set()
super(OperationCompletionHandler, self).join(timeout)
+ def handle_operation_completion(self, oper):
+ result = self.rest_client.call('GET',
+ oper.operation_url,
+ None,
+ None)
+ completed = result[RESP_DATA]['complete']
+ reason = result[RESP_REASON],
+ description = result[RESP_STR]
+ if completed:
+ # operation is done - update the DB with the status
+ # or delete the entire graph from DB
+ success = result[RESP_DATA]['success']
+ sec_to_completion = time.time() - oper.creation_time
+ debug_data = {'oper': oper,
+ 'sec_to_completion': sec_to_completion,
+ 'success': success}
+ LOG.debug(_('Operation %(oper)s is completed after '
+ '%(sec_to_completion)d sec '
+ 'with success status: %(success)s :'),
+ debug_data)
+ db_status = None
+ if not success:
+ # failure - log it and set the return ERROR as DB state
+ if reason or description:
+ msg = 'Reason:%s. Description:%s' % (reason, description)
+ else:
+ msg = "unknown"
+ error_params = {"operation": oper, "msg": msg}
+ LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
+ error_params)
+ db_status = constants.ERROR
+ else:
+ if oper.delete:
+ _remove_object_from_db(self.plugin, oper)
+ else:
+ db_status = constants.ACTIVE
+
+ if db_status:
+ _update_vip_graph_status(self.plugin, oper, db_status)
+
+ return completed
+
def run(self):
oper = None
while not self.stoprequest.isSet():
str(oper))
# check the status - if oper is done: update the db ,
# else push the oper again to the queue
- result = self.rest_client.call('GET',
- oper.operation_url,
- None,
- None)
- completed = result[RESP_DATA]['complete']
- if completed:
- # operation is done - update the DB with the status
- # or delete the entire graph from DB
- success = result[RESP_DATA]['success']
- sec_to_completion = time.time() - oper.creation_time
- debug_data = {'oper': oper,
- 'sec_to_completion': sec_to_completion,
- 'success': success}
- LOG.debug(_('Operation %(oper)s is completed after '
- '%(sec_to_completion)d sec '
- 'with success status: %(success)s :'),
- debug_data)
- db_status = self._get_db_status(oper, success)
- if db_status:
- _update_vip_graph_status(
- self.plugin, oper, db_status)
- else:
- _remove_object_from_db(
- self.plugin, oper)
- else:
+ if not self.handle_operation_completion(oper):
LOG.debug(_('Operation %s is not completed yet..') % oper)
# Not completed - push to the queue again
self.queue.put_nowait(oper)
#
# @author: Avishay Balderman, Radware
+import Queue
import re
import contextlib
-import eventlet
import mock
from neutron import context
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
-def rest_call_function_mock(action, resource, data, headers, binary=False):
+class QueueMock(Queue.Queue):
+ def __init__(self, completion_handler):
+ self.completion_handler = completion_handler
+ super(QueueMock, self).__init__()
+
+ def put_nowait(self, oper):
+ self.completion_handler(oper)
+
+def rest_call_function_mock(action, resource, data, headers, binary=False):
if rest_call_function_mock.RESPOND_WITH_ERROR:
return 400, 'error_status', 'error_description', None
rest_call_function_mock.__dict__.update(
{'TEMPLATES_MISSING': False})
- self.rest_call_mock = mock.Mock(name='rest_call_mock',
- side_effect=rest_call_function_mock,
- spec=self.plugin_instance.
- drivers['radware'].
- rest_client.call)
+ self.operation_completer_start_mock = mock.Mock(
+ return_value=None)
+ self.operation_completer_join_mock = mock.Mock(
+ return_value=None)
+ self.driver_rest_call_mock = mock.Mock(
+ side_effect=rest_call_function_mock)
+
radware_driver = self.plugin_instance.drivers['radware']
- radware_driver.rest_client.call = self.rest_call_mock
+ radware_driver.completion_handler.start = (
+ self.operation_completer_start_mock)
+ radware_driver.completion_handler.join = (
+ self.operation_completer_join_mock)
+ radware_driver.rest_client.call = self.driver_rest_call_mock
+ radware_driver.completion_handler.rest_client.call = (
+ self.driver_rest_call_mock)
+
+ radware_driver.queue = QueueMock(
+ radware_driver.completion_handler.handle_operation_completion)
self.addCleanup(radware_driver.completion_handler.join)
def test_create_vip_failure(self):
"""Test the rest call failure handling by Exception raising."""
- self.rest_call_mock.reset_mock()
with self.network(do_delete=False) as network:
with self.subnet(network=network, do_delete=False) as subnet:
with self.pool(no_delete=True, provider='radware') as pool:
{'vip': vip_data})
def test_create_vip(self):
- self.skipTest("Skipping test till bug 1288312 is fixed")
-
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as pool:
vip_data = {
mock.call('GET', '/api/workflow/' +
pool['pool']['id'], None, None)
]
- self.rest_call_mock.assert_has_calls(calls, any_order=True)
-
- # sleep to wait for the operation completion
- eventlet.greenthread.sleep(0)
+ self.driver_rest_call_mock.assert_has_calls(calls,
+ any_order=True)
#Test DB
new_vip = self.plugin_instance.get_vip(
mock.call('DELETE', u'/api/workflow/' + pool['pool']['id'],
None, None)
]
- self.rest_call_mock.assert_has_calls(calls, any_order=True)
- # need to wait some time to allow driver to delete vip
- eventlet.greenthread.sleep(1)
+ self.driver_rest_call_mock.assert_has_calls(
+ calls, any_order=True)
def test_update_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
vip_data = {
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER),
]
- self.rest_call_mock.assert_has_calls(calls, any_order=True)
-
- updated_vip = self.plugin_instance.get_vip(
- context.get_admin_context(), vip['id'])
- self.assertEqual(updated_vip['status'],
- constants.PENDING_UPDATE)
+ self.driver_rest_call_mock.assert_has_calls(
+ calls, any_order=True)
- # sleep to wait for the operation completion
- eventlet.greenthread.sleep(1)
updated_vip = self.plugin_instance.get_vip(
context.get_admin_context(), vip['id'])
self.assertEqual(updated_vip['status'], constants.ACTIVE)
context.get_admin_context(), vip['id'])
def test_delete_vip_failure(self):
- self.rest_call_mock.reset_mock()
plugin = self.plugin_instance
with self.network(do_delete=False) as network:
context.get_admin_context(), hm, pool['pool']['id']
)
- eventlet.greenthread.sleep(1)
-
rest_call_function_mock.__dict__.update(
{'RESPOND_WITH_ERROR': True})
self.assertEqual(u_phm['status'], constants.ACTIVE)
def test_delete_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
vip_data = {
mock.call('DELETE', '/api/workflow/' + pool['pool']['id'],
None, None)
]
- self.rest_call_mock.assert_has_calls(calls, any_order=True)
+ self.driver_rest_call_mock.assert_has_calls(
+ calls, any_order=True)
self.assertRaises(loadbalancer.VipNotFound,
self.plugin_instance.get_vip,
context.get_admin_context(), vip['id'])
def test_update_pool(self):
- self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool() as pool:
del pool['pool']['provider']
self.assertEqual(pool_db['status'], constants.PENDING_UPDATE)
def test_delete_pool_with_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
with self.vip(pool=pool, subnet=subnet):
pool['pool']['id'])
def test_create_member_with_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.vip(pool=p, subnet=subnet):
mock.ANY, driver.TEMPLATE_HEADER
)
]
- self.rest_call_mock.assert_has_calls(calls,
- any_order=True)
+ self.driver_rest_call_mock.assert_has_calls(
+ calls, any_order=True)
def test_update_member_with_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id']) as member:
mock.ANY, driver.TEMPLATE_HEADER
)
]
- self.rest_call_mock.assert_has_calls(calls,
- any_order=True)
+ self.driver_rest_call_mock.assert_has_calls(
+ calls, any_order=True)
updated_member = self.plugin_instance.get_member(
context.get_admin_context(),
member['member']['id']
)
- # sleep to wait for the operation completion
- eventlet.greenthread.sleep(0)
updated_member = self.plugin_instance.get_member(
context.get_admin_context(),
member['member']['id']
constants.ACTIVE)
def test_update_member_without_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool(provider='radware') as pool:
with self.member(pool_id=pool['pool']['id']) as member:
constants.PENDING_UPDATE)
def test_delete_member_with_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id'],
# wait for being sure the member
# Changed status from PENDING-CREATE
# to ACTIVE
- self.rest_call_mock.reset_mock()
- eventlet.greenthread.sleep(1)
self.plugin_instance.delete_member(
context.get_admin_context(),
m['member']['id']
)
- args, kwargs = self.rest_call_mock.call_args
+ name, args, kwargs = (
+ self.driver_rest_call_mock.mock_calls[-2]
+ )
deletion_post_graph = str(args[2])
self.assertTrue(re.search(
r'.*\'member_address_array\': \[\].*',
deletion_post_graph
))
+
calls = [
mock.call(
'POST', '/api/workflow/' + p['pool']['id'] +
mock.ANY, driver.TEMPLATE_HEADER
)
]
- self.rest_call_mock.assert_has_calls(
+ self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
- eventlet.greenthread.sleep(1)
self.assertRaises(loadbalancer.MemberNotFound,
self.plugin_instance.get_member,
context.get_admin_context(),
m['member']['id'])
def test_delete_member_without_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id'], no_delete=True) as m:
m['member']['id'])
def test_create_hm_with_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.health_monitor() as hm:
with self.pool(provider='radware') as pool:
mock.ANY, driver.TEMPLATE_HEADER
)
]
- self.rest_call_mock.assert_has_calls(
+ self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
- eventlet.greenthread.sleep(1)
-
phm = self.plugin_instance.get_pool_health_monitor(
context.get_admin_context(),
hm['health_monitor']['id'], pool['pool']['id']
self.assertEqual(phm['status'], constants.ACTIVE)
def test_delete_pool_hm_with_vip(self):
- self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.health_monitor(no_delete=True) as hm:
with self.pool(provider='radware') as pool:
hm, pool['pool']['id']
)
- # Reset mock and
- # wait for being sure that status
- # changed from PENDING-CREATE
- # to ACTIVE
- self.rest_call_mock.reset_mock()
- eventlet.greenthread.sleep(1)
-
self.plugin_instance.delete_pool_health_monitor(
context.get_admin_context(),
hm['health_monitor']['id'],
pool['pool']['id']
)
- eventlet.greenthread.sleep(1)
- name, args, kwargs = self.rest_call_mock.mock_calls[-2]
+ name, args, kwargs = (
+ self.driver_rest_call_mock.mock_calls[-2]
+ )
deletion_post_graph = str(args[2])
self.assertTrue(re.search(
mock.ANY, driver.TEMPLATE_HEADER
)
]
- self.rest_call_mock.assert_has_calls(
+ self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
self.assertRaises(