-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2013 Nicira, Inc.
# All Rights Reserved
#
func, sp=SyncParameters(min_chunk_size))
state_synchronizer.start(
periodic_interval_max=state_sync_interval)
+ return state_synchronizer
class NvpSynchronizer():
raise nvp_exc.NvpPluginException(err_msg=err_msg)
# Backoff time in case of failures while fetching sync data
self._sync_backoff = 1
- _start_loopingcall(min_chunk_size, state_sync_interval,
- self._synchronize_state)
+ # Store the looping call in an instance variable to allow unit tests
+ # for controlling its lifecycle
+ self._sync_looping_call = _start_loopingcall(
+ min_chunk_size, state_sync_interval, self._synchronize_state)
def _get_tag_dict(self, tags):
return dict((tag.get('scope'), tag['tag']) for tag in tags)
cfg.CONF.set_override('use_namespaces', True)
agent_config.register_root_helper(cfg.CONF)
- self.device_exists_p = mock.patch(
+ device_exists_p = mock.patch(
'neutron.agent.linux.ip_lib.device_exists')
- self.device_exists = self.device_exists_p.start()
+ device_exists_p.start()
- self.utils_exec_p = mock.patch(
+ utils_exec_p = mock.patch(
'neutron.agent.linux.utils.execute')
- self.utils_exec = self.utils_exec_p.start()
+ utils_exec_p.start()
- self.drv_cls_p = mock.patch('neutron.agent.linux.interface.NullDriver')
- driver_cls = self.drv_cls_p.start()
- self.mock_driver = mock.MagicMock()
- self.mock_driver.DEV_NAME_LEN = (
+ drv_cls_p = mock.patch('neutron.agent.linux.interface.NullDriver')
+ driver_cls = drv_cls_p.start()
+ mock_driver = mock.MagicMock()
+ mock_driver.DEV_NAME_LEN = (
interface.LinuxInterfaceDriver.DEV_NAME_LEN)
- driver_cls.return_value = self.mock_driver
+ driver_cls.return_value = mock_driver
- self.l3_plugin_p = mock.patch(
+ l3_plugin_p = mock.patch(
'neutron.agent.l3_agent.L3PluginApi')
- l3_plugin_cls = self.l3_plugin_p.start()
- self.plugin_api = mock.Mock()
- l3_plugin_cls.return_value = self.plugin_api
+ l3_plugin_cls = l3_plugin_p.start()
+ l3_plugin_cls.return_value = mock.Mock()
self.external_process_p = mock.patch(
'neutron.agent.linux.external_process.ProcessManager'
)
- self.external_process = self.external_process_p.start()
-
+ self.external_process_p.start()
+ looping_call_p = mock.patch(
+ 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
+ looping_call_p.start()
self.agent = l3_agent.L3NATAgent(HOSTNAME)
-
- def tearDown(self):
- self.device_exists_p.stop()
- self.utils_exec_p.stop()
- self.drv_cls_p.stop()
- self.l3_plugin_p.stop()
- self.external_process_p.stop()
- super(TestL3AgentEventHandler, self).tearDown()
+ self.addCleanup(mock.patch.stopall)
def test_spawn_metadata_proxy(self):
router_id = _uuid()