ip: IpFilter, ip, root
ip_exec: IpNetnsExecFilter, ip, root
+# For ip monitor
+kill_ip_monitor: KillFilter, root, ip, -9
+
# ovs_lib (if OVSInterfaceDriver is used)
ovs-vsctl: CommandFilter, ovs-vsctl, root
return utils.pid_invoked_with_cmdline(
self.pid, self.cmd_without_namespace)
- def start(self, blocking=False):
+ def start(self, block=False):
"""Launch a process and monitor it asynchronously.
- :param blocking: Block until the process has started.
+ :param block: Block until the process has started.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not start in time.
"""
LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
- if blocking:
+ if block:
utils.wait_until_true(self.is_active)
- def stop(self, blocking=False):
+ def stop(self, block=False):
"""Halt the process and watcher threads.
- :param blocking: Block until the process has stopped.
+ :param block: Block until the process has stopped.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not stop in time.
"""
else:
raise AsyncProcessException(_('Process is not running.'))
- if blocking:
+ if block:
utils.wait_until_true(lambda: not self.is_active())
def _spawn(self):
def _read_stderr(self):
return self._read(self._process.stderr, self._stderr_lines)
- def _iter_queue(self, queue):
+ def _iter_queue(self, queue, block):
while True:
try:
- yield queue.get_nowait()
+ yield queue.get(block=block)
except eventlet.queue.Empty:
break
- def iter_stdout(self):
- return self._iter_queue(self._stdout_lines)
+ def iter_stdout(self, block=False):
+ return self._iter_queue(self._stdout_lines, block)
- def iter_stderr(self):
- return self._iter_queue(self._stderr_lines)
+ def iter_stderr(self, block=False):
+ return self._iter_queue(self._stderr_lines, block)
--- /dev/null
+# Copyright 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_utils import excutils
+
+from neutron.agent.linux import async_process
+from neutron.i18n import _LE
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class IPMonitorEvent(object):
+ def __init__(self, line, added, interface, cidr):
+ self.line = line
+ self.added = added
+ self.interface = interface
+ self.cidr = cidr
+
+ def __str__(self):
+ return self.line
+
+ @classmethod
+ def from_text(cls, line):
+ route = line.split()
+
+ try:
+ first_word = route[0]
+ except IndexError:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE('Unable to parse route "%s"'), line)
+
+ added = (first_word != 'Deleted')
+ if not added:
+ route = route[1:]
+
+ try:
+ interface = route[1]
+ cidr = route[3]
+ except IndexError:
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE('Unable to parse route "%s"'), line)
+
+ return cls(line, added, interface, cidr)
+
+
+class IPMonitor(async_process.AsyncProcess):
+ """Wrapper over `ip monitor address`.
+
+ To monitor and react indefinitely:
+ m = IPMonitor(namespace='tmp')
+ m.start()
+ for iterable in m:
+ event = IPMonitorEvent.from_text(iterable)
+ print event, event.added, event.interface, event.cidr
+ """
+
+ def __init__(self,
+ namespace=None,
+ respawn_interval=None):
+ super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'],
+ run_as_root=True,
+ respawn_interval=respawn_interval,
+ namespace=namespace)
+
+ def __iter__(self):
+ return self.iter_stdout(block=True)
+
+ def start(self):
+ super(IPMonitor, self).start(block=True)
+
+ def stop(self):
+ super(IPMonitor, self).stop(block=True)
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path])
self.addCleanup(self._safe_stop, proc)
- proc.start(blocking=True)
+ proc.start(block=True)
self._check_stdout(proc)
- proc.stop(blocking=True)
+ proc.stop(block=True)
# Ensure that the process and greenthreads have stopped
proc._process.wait()
--- /dev/null
+# Copyright 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.agent.linux import async_process
+from neutron.agent.linux import ip_monitor
+from neutron.tests.functional.agent.linux import test_ip_lib
+
+
+class TestIPMonitor(test_ip_lib.IpLibTestFramework):
+ def setUp(self):
+ super(TestIPMonitor, self).setUp()
+ attr = self.generate_device_details()
+ self.device = self.manage_device(attr)
+ self.monitor = ip_monitor.IPMonitor(attr.namespace)
+ self.addCleanup(self._safe_stop_monitor)
+
+ def _safe_stop_monitor(self):
+ try:
+ self.monitor.stop()
+ except async_process.AsyncProcessException:
+ pass
+
+ def test_ip_monitor_lifecycle(self):
+ self.assertFalse(self.monitor.is_active())
+ self.monitor.start()
+ self.assertTrue(self.monitor.is_active())
+ self.monitor.stop()
+ self.assertFalse(self.monitor.is_active())
+
+ def test_ip_monitor_events(self):
+ self.monitor.start()
+
+ cidr = '169.254.128.1/24'
+ self.device.addr.add(4, cidr, '169.254.128.255')
+ self._assert_event(expected_name=self.device.name,
+ expected_cidr=cidr,
+ expected_added=True,
+ event=ip_monitor.IPMonitorEvent.from_text(
+ next(self.monitor.iter_stdout(block=True))))
+
+ self.device.addr.delete(4, cidr)
+ self._assert_event(expected_name=self.device.name,
+ expected_cidr=cidr,
+ expected_added=False,
+ event=ip_monitor.IPMonitorEvent.from_text(
+ next(self.monitor.iter_stdout(block=True))))
+
+ def _assert_event(self,
+ expected_name,
+ expected_cidr,
+ expected_added,
+ event):
+ self.assertEqual(expected_name, event.interface)
+ self.assertEqual(expected_added, event.added)
+ self.assertEqual(expected_cidr, event.cidr)
def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
cmd = ['tail', '-f', self.test_file_path]
proc = async_process.AsyncProcess(cmd)
- proc.start(blocking=True)
+ proc.start(block=True)
self.addCleanup(proc.stop)
pid = proc.pid
mock_start.assert_called_once_with()
def test__iter_queue_returns_empty_list_for_empty_queue(self):
- result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
+ result = list(self.proc._iter_queue(eventlet.queue.LightQueue(),
+ False))
self.assertEqual(result, [])
def test__iter_queue_returns_queued_data(self):
queue = eventlet.queue.LightQueue()
queue.put('foo')
- result = list(self.proc._iter_queue(queue))
+ result = list(self.proc._iter_queue(queue, False))
self.assertEqual(result, ['foo'])
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
self.assertEqual(value, expected_value)
queue = getattr(self.proc, '_%s_lines' % output_type, None)
- mock_iter_queue.assert_called_with(queue)
+ mock_iter_queue.assert_called_with(queue, False)
def test_iter_stdout(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
--- /dev/null
+# Copyright 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.agent.linux import ip_monitor
+from neutron.tests import base
+
+
+class TestIPMonitorEvent(base.BaseTestCase):
+ def test_from_text_parses_added_line(self):
+ event = ip_monitor.IPMonitorEvent.from_text(
+ '3: wlp3s0 inet 192.168.3.59/24 brd 192.168.3.255 '
+ 'scope global dynamic wlp3s0\ valid_lft 300sec '
+ 'preferred_lft 300sec')
+ self.assertEqual('wlp3s0', event.interface)
+ self.assertTrue(event.added)
+ self.assertEqual('192.168.3.59/24', event.cidr)
+
+ def test_from_text_parses_deleted_line(self):
+ event = ip_monitor.IPMonitorEvent.from_text(
+ 'Deleted 1: lo inet 127.0.0.2/8 scope host secondary lo\''
+ ' valid_lft forever preferred_lft forever')
+ self.assertEqual('lo', event.interface)
+ self.assertFalse(event.added)
+ self.assertEqual('127.0.0.2/8', event.cidr)