# Agent's polling interval in seconds
# polling_interval = 2
+# Minimize polling by monitoring ovsdb for interface changes
+# minimize_polling = False
+
# (ListOpt) The types of tenant network tunnels supported by the agent.
# Setting this will enable tunneling support in the agent. This can be set to
# either 'gre' or 'vxlan'. If this is unset, it will default to [] and
# from the old mechanism
ovs-vsctl: CommandFilter, ovs-vsctl, root
ovs-ofctl: CommandFilter, ovs-ofctl, root
+kill_ovsdb_client: KillFilter, root, /usr/bin/ovsdb-client, -9
+ovsdb-client: CommandFilter, ovsdb-client, root
xe: CommandFilter, xe, root
# ip_lib
self.respawn_interval = respawn_interval
self._process = None
self._kill_event = None
+ self._reset_queues()
+ self._watchers = []
+
+ def _reset_queues(self):
self._stdout_lines = eventlet.queue.LightQueue()
self._stderr_lines = eventlet.queue.LightQueue()
- self._watchers = []
def start(self):
"""Launch a process and monitor it asynchronously."""
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+
+from neutron.agent.linux import async_process
+from neutron.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class OvsdbMonitor(async_process.AsyncProcess):
+ """Manages an invocation of 'ovsdb-client monitor'."""
+
+ def __init__(self, table_name, columns=None, format=None,
+ root_helper=None, respawn_interval=None):
+
+ cmd = ['ovsdb-client', 'monitor', table_name]
+ if columns:
+ cmd.append(','.join(columns))
+ if format:
+ cmd.append('--format=%s' % format)
+ super(OvsdbMonitor, self).__init__(cmd,
+ root_helper=root_helper,
+ respawn_interval=respawn_interval)
+
+ def _read_stdout(self):
+ data = self._process.stdout.readline()
+ if not data:
+ return
+ #TODO(marun) The default root helper outputs exit errors to
+ # stdout due to bug #1219530. This check can be moved to
+ # _read_stderr once the error is correctly output to stderr.
+ if self.root_helper and self.root_helper in data:
+ self._stderr_lines.put(data)
+ LOG.error(_('Error received from ovsdb monitor: %s') % data)
+ else:
+ self._stdout_lines.put(data)
+ LOG.debug(_('Output received from ovsdb monitor: %s') % data)
+ return data
+
+ def _read_stderr(self):
+ data = super(OvsdbMonitor, self)._read_stderr()
+ if data:
+ LOG.error(_('Error received from ovsdb monitor: %s') % data)
+ # Do not return value to ensure that stderr output will
+ # stop the monitor.
+
+
+class SimpleInterfaceMonitor(OvsdbMonitor):
+ """Monitors the Interface table of the local host's ovsdb for changes.
+
+ The has_updates() method indicates whether changes to the ovsdb
+ Interface table have been detected since the monitor started or
+ since the previous access.
+ """
+
+ def __init__(self, root_helper=None, respawn_interval=None):
+ super(SimpleInterfaceMonitor, self).__init__(
+ 'Interface',
+ columns=['name'],
+ format='json',
+ root_helper=root_helper,
+ respawn_interval=respawn_interval,
+ )
+ self.data_received = False
+
+ @property
+ def is_active(self):
+ return (self.data_received and
+ self._kill_event and
+ not self._kill_event.ready())
+
+ @property
+ def has_updates(self):
+ """Indicate whether the ovsdb Interface table has been updated.
+
+ True will be returned if the monitor process is not active.
+ This 'failing open' minimizes the risk of falsely indicating
+ the absense of updates at the expense of potential false
+ positives.
+ """
+ return bool(list(self.iter_stdout())) or not self.is_active
+
+ def start(self, block=False, timeout=5):
+ super(SimpleInterfaceMonitor, self).start()
+ if block:
+ eventlet.timeout.Timeout(timeout)
+ while not self.is_active:
+ eventlet.sleep()
+
+ def _kill(self, *args, **kwargs):
+ self.data_received = False
+ super(SimpleInterfaceMonitor, self)._kill(*args, **kwargs)
+
+ def _read_stdout(self):
+ data = super(SimpleInterfaceMonitor, self)._read_stdout()
+ if data and not self.data_received:
+ self.data_received = True
+ return data
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+
+import eventlet
+
+from neutron.agent.linux import ovsdb_monitor
+
+
+@contextlib.contextmanager
+def get_polling_manager(minimize_polling=False, root_helper=None):
+ if minimize_polling:
+ pm = InterfacePollingMinimizer(root_helper=root_helper)
+ pm.start()
+ else:
+ pm = AlwaysPoll()
+ try:
+ yield pm
+ finally:
+ if minimize_polling:
+ pm.stop()
+
+
+class BasePollingManager(object):
+
+ def __init__(self):
+ self._force_polling = False
+ self._polling_completed = True
+
+ def force_polling(self):
+ self._force_polling = True
+
+ def polling_completed(self):
+ self._polling_completed = True
+
+ def _is_polling_required(self):
+ raise NotImplemented
+
+ @property
+ def is_polling_required(self):
+ # Always consume the updates to minimize polling.
+ polling_required = self._is_polling_required()
+
+ # Polling is required regardless of whether updates have been
+ # detected.
+ if self._force_polling:
+ self._force_polling = False
+ polling_required = True
+
+ # Polling is required if not yet done for previously detected
+ # updates.
+ if not self._polling_completed:
+ polling_required = True
+
+ if polling_required:
+ # Track whether polling has been completed to ensure that
+ # polling can be required until the caller indicates via a
+ # call to polling_completed() that polling has been
+ # successfully performed.
+ self._polling_completed = False
+
+ return polling_required
+
+
+class AlwaysPoll(BasePollingManager):
+
+ @property
+ def is_polling_required(self):
+ return True
+
+
+class InterfacePollingMinimizer(BasePollingManager):
+ """Monitors ovsdb to determine when polling is required."""
+
+ def __init__(self, root_helper=None):
+ super(InterfacePollingMinimizer, self).__init__()
+ self._monitor = ovsdb_monitor.SimpleInterfaceMonitor(
+ root_helper=root_helper)
+
+ def start(self):
+ self._monitor.start()
+
+ def stop(self):
+ self._monitor.stop()
+
+ def _is_polling_required(self):
+ # Maximize the chances of update detection having a chance to
+ # collect output.
+ eventlet.sleep()
+ return self._monitor.has_updates
from neutron.agent import l2population_rpc
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
+from neutron.agent.linux import polling
from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
def __init__(self, integ_br, tun_br, local_ip,
bridge_mappings, root_helper,
polling_interval, tunnel_types=None,
- veth_mtu=None, l2_population=False):
+ veth_mtu=None, l2_population=False,
+ minimize_polling=False):
'''Constructor.
:param integ_br: name of the integration bridge.
the agent. If set, will automatically set enable_tunneling to
True.
:param veth_mtu: MTU size for veth interfaces.
+ :param minimize_polling: Optional, whether to minimize polling by
+ monitoring ovsdb for interface changes.
'''
self.veth_mtu = veth_mtu
self.root_helper = root_helper
constants.TYPE_VXLAN: {}}
self.polling_interval = polling_interval
+ self.minimize_polling = minimize_polling
if tunnel_types:
self.enable_tunneling = True
resync = True
return resync
- def rpc_loop(self):
+ def rpc_loop(self, polling_manager=None):
+ if not polling_manager:
+ polling_manager = polling.AlwaysPoll()
+
sync = True
ports = set()
ancillary_ports = set()
ports.clear()
ancillary_ports.clear()
sync = False
+ polling_manager.force_polling()
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_("Agent tunnel out of sync with plugin!"))
tunnel_sync = self.tunnel_sync()
- port_info = self.update_ports(ports)
-
- # notify plugin about port deltas
- if port_info:
- LOG.debug(_("Agent loop has new devices!"))
- # If treat devices fails - must resync with plugin
- sync = self.process_network_ports(port_info)
- ports = port_info['current']
+ if polling_manager.is_polling_required:
+ port_info = self.update_ports(ports)
- # Treat ancillary devices if they exist
- if self.ancillary_brs:
- port_info = self.update_ancillary_ports(ancillary_ports)
+ # notify plugin about port deltas
if port_info:
- rc = self.process_ancillary_network_ports(port_info)
- ancillary_ports = port_info['current']
- sync = sync | rc
+ LOG.debug(_("Agent loop has new devices!"))
+ # If treat devices fails - must resync with plugin
+ sync = self.process_network_ports(port_info)
+ ports = port_info['current']
+
+ # Treat ancillary devices if they exist
+ if self.ancillary_brs:
+ port_info = self.update_ancillary_ports(
+ ancillary_ports)
+ if port_info:
+ rc = self.process_ancillary_network_ports(
+ port_info)
+ ancillary_ports = port_info['current']
+ sync = sync | rc
+
+ polling_manager.polling_completed()
except Exception:
LOG.exception(_("Error in agent event loop"))
'elapsed': elapsed})
def daemon_loop(self):
- self.rpc_loop()
+ with polling.get_polling_manager(self.minimize_polling,
+ self.root_helper) as pm:
+ self.rpc_loop(polling_manager=pm)
def check_ovs_version(min_required_version, root_helper):
bridge_mappings=bridge_mappings,
root_helper=config.AGENT.root_helper,
polling_interval=config.AGENT.polling_interval,
+ minimize_polling=config.AGENT.minimize_polling,
tunnel_types=config.AGENT.tunnel_types,
veth_mtu=config.AGENT.veth_mtu,
l2_population=config.AGENT.l2_population,
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
+ cfg.BoolOpt('minimize_polling',
+ default=False,
+ help=_("Minimize polling by monitoring ovsdb for interface "
+ "changes.")),
cfg.ListOpt('tunnel_types', default=DEFAULT_TUNNEL_TYPES,
help=_("Network types supported by the agent "
"(gre and/or vxlan)")),
"""Base Test Case for all Unit Tests"""
+import contextlib
import logging
import os
+import eventlet.timeout
import fixtures
from oslo.config import cfg
import stubout
group = kw.pop('group', None)
for k, v in kw.iteritems():
CONF.set_override(k, v, group)
+
+ @contextlib.contextmanager
+ def assert_max_execution_time(self, max_execution_time=5):
+ with eventlet.timeout.Timeout(max_execution_time, False):
+ yield
+ return
+ self.fail('Execution of this test timed out')
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
-
import eventlet
-import eventlet.timeout
import fixtures
from neutron.agent.linux import async_process
output += new_output
eventlet.sleep(0.01)
- @contextlib.contextmanager
- def assert_max_execution_time(self, max_execution_time=5):
- with eventlet.timeout.Timeout(max_execution_time, False):
- yield
- return
- self.fail('Execution of this test timed out')
-
def test_stopping_async_process_lifecycle(self):
with self.assert_max_execution_time():
proc = async_process.AsyncProcess(['tail', '-f',
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests in this module will be skipped unless:
+
+ - ovsdb-client is installed
+
+ - ovsdb-client can be invoked via password-less sudo
+
+ - OS_SUDO_TESTING is set to '1' or 'True' in the test execution
+ environment
+
+
+The jenkins gate does not allow direct sudo invocation during test
+runs, but configuring OS_SUDO_TESTING ensures that developers are
+still able to execute tests that require the capability.
+"""
+
+import os
+import random
+
+import eventlet
+
+from neutron.agent.linux import ovs_lib
+from neutron.agent.linux import ovsdb_monitor
+from neutron.agent.linux import utils
+from neutron.tests import base
+
+
+def get_rand_name(name='test'):
+ return name + str(random.randint(1, 0x7fffffff))
+
+
+def create_ovs_resource(name_prefix, creation_func):
+ """Create a new ovs resource that does not already exist.
+
+ :param name_prefix: The prefix for a randomly generated name
+ :param creation_func: A function taking the name of the resource
+ to be created. An error is assumed to indicate a name
+ collision.
+ """
+ while True:
+ name = get_rand_name(name_prefix)
+ try:
+ return creation_func(name)
+ except RuntimeError:
+ continue
+ break
+
+
+class BaseMonitorTest(base.BaseTestCase):
+
+ def setUp(self):
+ super(BaseMonitorTest, self).setUp()
+
+ self._check_test_requirements()
+
+ self.root_helper = 'sudo'
+ self.ovs = ovs_lib.BaseOVS(self.root_helper)
+ self.bridge = create_ovs_resource('test-br-', self.ovs.add_bridge)
+
+ def cleanup_bridge():
+ self.bridge.destroy()
+ self.addCleanup(cleanup_bridge)
+
+ def _check_command(self, cmd, error_text, skip_msg):
+ try:
+ utils.execute(cmd)
+ except RuntimeError as e:
+ if error_text in str(e):
+ self.skipTest(skip_msg)
+ raise
+
+ def _check_test_requirements(self):
+ if os.environ.get('OS_SUDO_TESTING') not in base.TRUE_STRING:
+ self.skipTest('testing with sudo is not enabled')
+ self._check_command(['which', 'ovsdb-client'],
+ 'Exit code: 1',
+ 'ovsdb-client is not installed')
+ self._check_command(['sudo', '-n', 'ovsdb-client', 'list-dbs'],
+ 'Exit code: 1',
+ 'password-less sudo not granted for ovsdb-client')
+
+
+class TestOvsdbMonitor(BaseMonitorTest):
+
+ def setUp(self):
+ super(TestOvsdbMonitor, self).setUp()
+
+ self.monitor = ovsdb_monitor.OvsdbMonitor('Bridge',
+ root_helper=self.root_helper)
+ self.addCleanup(self.monitor.stop)
+ self.monitor.start()
+
+ def collect_initial_output(self):
+ while True:
+ output = list(self.monitor.iter_stdout())
+ if output:
+ return output[0]
+ eventlet.sleep(0.01)
+
+ def test_killed_monitor_respawns(self):
+ with self.assert_max_execution_time():
+ self.monitor.respawn_interval = 0
+ old_pid = self.monitor._process.pid
+ output1 = self.collect_initial_output()
+ pid = self.monitor._get_pid_to_kill()
+ self.monitor._reset_queues()
+ self.monitor._kill_process(pid)
+ while (self.monitor._process.pid == old_pid):
+ eventlet.sleep(0.01)
+ output2 = self.collect_initial_output()
+ # Initial output should appear twice
+ self.assertEqual(output1, output2)
+
+
+class TestSimpleInterfaceMonitor(BaseMonitorTest):
+
+ def setUp(self):
+ super(TestSimpleInterfaceMonitor, self).setUp()
+
+ self.monitor = ovsdb_monitor.SimpleInterfaceMonitor(
+ root_helper=self.root_helper)
+ self.addCleanup(self.monitor.stop)
+ self.monitor.start(block=True)
+
+ def test_has_updates(self):
+ self.assertTrue(self.monitor.has_updates,
+ 'Initial call should always be true')
+ self.assertFalse(self.monitor.has_updates,
+ 'has_updates without port addition should be False')
+ create_ovs_resource('test-port-', self.bridge.add_port)
+ with self.assert_max_execution_time():
+ # has_updates after port addition should become True
+ while not self.monitor.has_updates:
+ eventlet.sleep(0.01)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet.event
+import mock
+
+from neutron.agent.linux import ovsdb_monitor
+from neutron.tests import base
+
+
+class TestOvsdbMonitor(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestOvsdbMonitor, self).setUp()
+ self.root_helper = 'sudo'
+ self.monitor = ovsdb_monitor.OvsdbMonitor('Interface',
+ root_helper=self.root_helper)
+
+ def read_output_queues_and_returns_result(self, output_type, output):
+ with mock.patch.object(self.monitor, '_process') as mock_process:
+ with mock.patch.object(mock_process, output_type) as mock_file:
+ with mock.patch.object(mock_file, 'readline') as mock_readline:
+ mock_readline.return_value = output
+ func = getattr(self.monitor,
+ '_read_%s' % output_type,
+ None)
+ return func()
+
+ def test__read_stdout_returns_none_for_empty_read(self):
+ result = self.read_output_queues_and_returns_result('stdout', '')
+ self.assertIsNone(result)
+
+ def test__read_stdout_queues_root_wrapper_errors_to_stderr_output(self):
+ result = self.read_output_queues_and_returns_result('stdout',
+ self.root_helper)
+ self.assertIsNone(result)
+ self.assertEqual(self.monitor._stderr_lines.get_nowait(),
+ self.root_helper)
+
+ def test__read_stdout_queues_normal_output_to_stdout_queue(self):
+ output = 'foo'
+ result = self.read_output_queues_and_returns_result('stdout', output)
+ self.assertEqual(result, output)
+ self.assertEqual(self.monitor._stdout_lines.get_nowait(), output)
+
+ def test__read_stderr_returns_none(self):
+ result = self.read_output_queues_and_returns_result('stderr', '')
+ self.assertIsNone(result)
+
+
+class TestSimpleInterfaceMonitor(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestSimpleInterfaceMonitor, self).setUp()
+ self.root_helper = 'sudo'
+ self.monitor = ovsdb_monitor.SimpleInterfaceMonitor(
+ root_helper=self.root_helper)
+
+ def test_is_active_is_false_by_default(self):
+ self.assertFalse(self.monitor.is_active)
+
+ def test_is_active_can_be_true(self):
+ self.monitor.data_received = True
+ self.monitor._kill_event = eventlet.event.Event()
+ self.assertTrue(self.monitor.is_active)
+
+ def test_has_updates_is_true_by_default(self):
+ self.assertTrue(self.monitor.has_updates)
+
+ def test_has_updates_is_false_if_active_with_no_output(self):
+ target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
+ '.is_active')
+ with mock.patch(target,
+ new_callable=mock.PropertyMock(return_value=True)):
+ self.assertFalse(self.monitor.has_updates)
+
+ def test__kill_sets_data_received_to_false(self):
+ self.monitor.data_received = True
+ with mock.patch(
+ 'neutron.agent.linux.ovsdb_monitor.OvsdbMonitor._kill'):
+ self.monitor._kill()
+ self.assertFalse(self.monitor.data_received)
+
+ def test__read_stdout_sets_data_received_and_returns_output(self):
+ output = 'foo'
+ with mock.patch(
+ 'neutron.agent.linux.ovsdb_monitor.OvsdbMonitor._read_stdout',
+ return_value=output):
+ result = self.monitor._read_stdout()
+ self.assertTrue(self.monitor.data_received)
+ self.assertEqual(result, output)
+
+ def test__read_stdout_does_not_set_data_received_for_empty_ouput(self):
+ output = None
+ with mock.patch(
+ 'neutron.agent.linux.ovsdb_monitor.OvsdbMonitor._read_stdout',
+ return_value=output):
+ self.monitor._read_stdout()
+ self.assertFalse(self.monitor.data_received)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.agent.linux import polling
+from neutron.tests import base
+
+
+class TestGetPollingManager(base.BaseTestCase):
+
+ def test_return_always_poll_by_default(self):
+ with polling.get_polling_manager() as pm:
+ self.assertEqual(pm.__class__, polling.AlwaysPoll)
+
+ def test_manage_polling_minimizer(self):
+ mock_target = 'neutron.agent.linux.polling.InterfacePollingMinimizer'
+ with mock.patch('%s.start' % mock_target) as mock_start:
+ with mock.patch('%s.stop' % mock_target) as mock_stop:
+ with polling.get_polling_manager(minimize_polling=True,
+ root_helper='test') as pm:
+ self.assertEqual(pm._monitor.root_helper, 'test')
+ self.assertEqual(pm.__class__,
+ polling.InterfacePollingMinimizer)
+ mock_stop.assert_has_calls(mock.call())
+ mock_start.assert_has_calls(mock.call())
+
+
+class TestBasePollingManager(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestBasePollingManager, self).setUp()
+ self.pm = polling.BasePollingManager()
+
+ def test_force_polling_sets_interval_attribute(self):
+ self.assertFalse(self.pm._force_polling)
+ self.pm.force_polling()
+ self.assertTrue(self.pm._force_polling)
+
+ def test_polling_completed_sets_interval_attribute(self):
+ self.pm._polling_completed = False
+ self.pm.polling_completed()
+ self.assertTrue(self.pm._polling_completed)
+
+ def mock_is_polling_required(self, return_value):
+ return mock.patch.object(self.pm, '_is_polling_required',
+ return_value=return_value)
+
+ def test_is_polling_required_returns_true_when_forced(self):
+ with self.mock_is_polling_required(False):
+ self.pm.force_polling()
+ self.assertTrue(self.pm.is_polling_required)
+ self.assertFalse(self.pm._force_polling)
+
+ def test_is_polling_required_returns_true_when_polling_not_completed(self):
+ with self.mock_is_polling_required(False):
+ self.pm._polling_completed = False
+ self.assertTrue(self.pm.is_polling_required)
+
+ def test_is_polling_required_returns_true_when_updates_are_present(self):
+ with self.mock_is_polling_required(True):
+ self.assertTrue(self.pm.is_polling_required)
+ self.assertFalse(self.pm._polling_completed)
+
+ def test_is_polling_required_returns_false_for_no_updates(self):
+ with self.mock_is_polling_required(False):
+ self.assertFalse(self.pm.is_polling_required)
+
+
+class TestAlwaysPoll(base.BaseTestCase):
+
+ def test_is_polling_required_always_returns_true(self):
+ pm = polling.AlwaysPoll()
+ self.assertTrue(pm.is_polling_required)
+
+
+class TestInterfacePollingMinimizer(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestInterfacePollingMinimizer, self).setUp()
+ self.pm = polling.InterfacePollingMinimizer()
+
+ def test_start_calls_monitor_start(self):
+ with mock.patch.object(self.pm._monitor, 'start') as mock_start:
+ self.pm.start()
+ mock_start.assert_called_with()
+
+ def test_stop_calls_monitor_stop(self):
+ with mock.patch.object(self.pm._monitor, 'stop') as mock_stop:
+ self.pm.stop()
+ mock_stop.assert_called_with()
+
+ def mock_has_updates(self, return_value):
+ target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
+ '.has_updates')
+ return mock.patch(
+ target,
+ new_callable=mock.PropertyMock(return_value=return_value),
+ )
+
+ def test__is_polling_required_returns_when_updates_are_present(self):
+ with self.mock_has_updates(True):
+ self.assertTrue(self.pm._is_polling_required())
self.agent.reclaim_local_vlan('net2')
del_port_fn.assert_called_once_with('gre-ip_agent_2')
+ def test_daemon_loop_uses_polling_manager(self):
+ with mock.patch(
+ 'neutron.agent.linux.polling.get_polling_manager') as mock_get_pm:
+ with mock.patch.object(self.agent, 'rpc_loop') as mock_loop:
+ self.agent.daemon_loop()
+ mock_get_pm.assert_called_with(False, 'sudo')
+ mock_loop.called_once()
+
class AncillaryBridgesTest(base.BaseTestCase):