# under the License.
import eventlet
-import os
import netaddr
from oslo_config import cfg
class SubProcessBase(object):
def __init__(self, root_helper=None, namespace=None,
log_fail_as_error=True):
- self.root_helper = root_helper
self.namespace = namespace
self.log_fail_as_error = log_fail_as_error
try:
elif self.force_root:
# Force use of the root helper to ensure that commands
# will execute in dom0 when running under XenServer/XCP.
- return self._execute(options, command, args, self.root_helper,
+ return self._execute(options, command, args, run_as_root=True,
log_fail_as_error=self.log_fail_as_error)
else:
return self._execute(options, command, args,
log_fail_as_error=self.log_fail_as_error)
- def enforce_root_helper(self):
- if not self.root_helper and os.geteuid() != 0:
- raise exceptions.SudoRequired()
-
def _as_root(self, options, command, args, use_root_namespace=False):
- self.enforce_root_helper()
-
namespace = self.namespace if not use_root_namespace else None
- return self._execute(options,
- command,
- args,
- self.root_helper,
- namespace,
+ return self._execute(options, command, args, run_as_root=True,
+ namespace=namespace,
log_fail_as_error=self.log_fail_as_error)
@classmethod
- def _execute(cls, options, command, args, root_helper=None,
+ def _execute(cls, options, command, args, run_as_root=False,
namespace=None, log_fail_as_error=True):
opt_list = ['-%s' % o for o in options]
if namespace:
ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip']
else:
ip_cmd = ['ip']
- return utils.execute(ip_cmd + opt_list + [command] + list(args),
- root_helper=root_helper,
+ cmd = ip_cmd + opt_list + [command] + list(args)
+ return utils.execute(cmd, run_as_root=run_as_root,
log_fail_as_error=log_fail_as_error)
def set_log_fail_as_error(self, fail_with_error):
class IPWrapper(SubProcessBase):
def __init__(self, root_helper=None, namespace=None):
- super(IPWrapper, self).__init__(root_helper=root_helper,
- namespace=namespace)
+ super(IPWrapper, self).__init__(namespace=namespace)
self.netns = IpNetnsCommand(self)
def device(self, name):
- return IPDevice(name, self.root_helper, self.namespace)
+ return IPDevice(name, namespace=self.namespace)
def get_devices(self, exclude_loopback=False):
retval = []
- output = self._execute(['o', 'd'], 'link', ('list',),
- self.root_helper, self.namespace)
+ output = self._run(['o', 'd'], 'link', ('list',))
for line in output.split('\n'):
if '<' not in line:
continue
if exclude_loopback and name == LOOPBACK_DEVNAME:
continue
- retval.append(IPDevice(name,
- self.root_helper,
- self.namespace))
+ retval.append(IPDevice(name, namespace=self.namespace))
return retval
def add_tuntap(self, name, mode='tap'):
self._as_root('', 'tuntap', ('add', name, 'mode', mode))
- return IPDevice(name, self.root_helper, self.namespace)
+ return IPDevice(name, namespace=self.namespace)
def add_veth(self, name1, name2, namespace2=None):
args = ['add', name1, 'type', 'veth', 'peer', 'name', name2]
self._as_root('', 'link', tuple(args))
- return (IPDevice(name1, self.root_helper, self.namespace),
- IPDevice(name2, self.root_helper, namespace2))
+ return (IPDevice(name1, namespace=self.namespace),
+ IPDevice(name2, namespace=namespace2))
def del_veth(self, name):
"""Delete a virtual interface between two namespaces."""
lo = ip.device(LOOPBACK_DEVNAME)
lo.link.set_up()
else:
- ip = IPWrapper(self.root_helper, name)
+ ip = IPWrapper(namespace=name)
return ip
def namespace_is_empty(self):
elif port:
raise exceptions.NetworkVxlanPortRangeError(vxlan_range=port)
self._as_root('', 'link', cmd)
- return (IPDevice(name, self.root_helper, self.namespace))
+ return (IPDevice(name, namespace=self.namespace))
@classmethod
- def get_namespaces(cls, root_helper):
- output = cls._execute('', 'netns', ('list',), root_helper=root_helper)
+ def get_namespaces(cls, root_helper=None):
+ output = cls._execute('', 'netns', ('list',))
return [l.strip() for l in output.split('\n')]
class IPDevice(SubProcessBase):
def __init__(self, name, root_helper=None, namespace=None):
- super(IPDevice, self).__init__(root_helper=root_helper,
- namespace=namespace)
+ super(IPDevice, self).__init__(namespace=namespace)
self.name = name
self.link = IpLinkCommand(self)
self.addr = IpAddrCommand(self)
def add(self, name):
self._as_root('add', name, use_root_namespace=True)
- wrapper = IPWrapper(self._parent.root_helper, name)
+ wrapper = IPWrapper(namespace=name)
wrapper.netns.execute(['sysctl', '-w',
'net.ipv4.conf.all.promote_secondaries=1'])
return wrapper
def execute(self, cmds, addl_env=None, check_exit_code=True,
extra_ok_codes=None):
ns_params = []
+ kwargs = {}
if self._parent.namespace:
- self._parent.enforce_root_helper()
+ kwargs['run_as_root'] = True
ns_params = ['ip', 'netns', 'exec', self._parent.namespace]
env_params = []
if addl_env:
env_params = (['env'] +
['%s=%s' % pair for pair in addl_env.items()])
- return utils.execute(
- ns_params + env_params + list(cmds),
- root_helper=self._parent.root_helper,
- check_exit_code=check_exit_code, extra_ok_codes=extra_ok_codes)
+ cmd = ns_params + env_params + list(cmds)
+ return utils.execute(cmd, check_exit_code=check_exit_code,
+ extra_ok_codes=extra_ok_codes, **kwargs)
def exists(self, name):
- root_helper = self._parent.root_helper
- if not cfg.CONF.AGENT.use_helper_for_ns_read:
- root_helper = None
- output = self._parent._execute('o', 'netns', ['list'],
- root_helper=root_helper)
+ output = self._parent._execute(
+ 'o', 'netns', ['list'],
+ run_as_root=cfg.CONF.AGENT.use_helper_for_ns_read)
for line in output.split('\n'):
if name == line.strip():
return True
def device_exists(device_name, root_helper=None, namespace=None):
"""Return True if the device exists in the namespace."""
try:
- dev = IPDevice(device_name, root_helper, namespace)
+ dev = IPDevice(device_name, namespace=namespace)
dev.set_log_fail_as_error(False)
address = dev.link.address
except RuntimeError:
exists in the namespace.
"""
try:
- device = IPDevice(device_name, root_helper, namespace)
+ device = IPDevice(device_name, namespace=namespace)
if mac != device.link.address:
return False
if ip_cidr not in (ip['cidr'] for ip in device.addr.list()):
'device': device_name}
"""
- ip_wrapper = IPWrapper(root_helper, namespace=namespace)
+ ip_wrapper = IPWrapper(namespace=namespace)
table = ip_wrapper.netns.execute(['ip', 'route'], check_exit_code=True)
routes = []
def ensure_device_is_ready(device_name, root_helper=None, namespace=None):
- dev = IPDevice(device_name, root_helper, namespace)
+ dev = IPDevice(device_name, namespace=namespace)
dev.set_log_fail_as_error(False)
try:
# Ensure the device is up, even if it is already up. If the device
def iproute_arg_supported(command, arg, root_helper=None):
command += ['help']
- stdout, stderr = utils.execute(command, root_helper=root_helper,
- check_exit_code=False, return_stderr=True)
+ stdout, stderr = utils.execute(command, check_exit_code=False,
+ return_stderr=True)
return any(arg in line for line in stderr.split('\n'))
-def _arping(ns_name, iface_name, address, count, root_helper):
+def _arping(ns_name, iface_name, address, count):
arping_cmd = ['arping', '-A', '-I', iface_name, '-c', count, address]
try:
- ip_wrapper = IPWrapper(root_helper, namespace=ns_name)
+ ip_wrapper = IPWrapper(namespace=ns_name)
ip_wrapper.netns.execute(arping_cmd, check_exit_code=True)
except Exception:
msg = _LE("Failed sending gratuitous ARP "
'ns': ns_name})
-def send_gratuitous_arp(ns_name, iface_name, address, count, root_helper):
+def send_gratuitous_arp(ns_name, iface_name, address, count, root_helper=None):
"""Send a gratuitous arp using given namespace, interface, and address"""
def arping():
- _arping(ns_name, iface_name, address, count, root_helper)
+ _arping(ns_name, iface_name, address, count)
if count > 0:
eventlet.spawn_n(arping)
-def send_garp_for_proxyarp(ns_name, iface_name, address, count, root_helper):
+def send_garp_for_proxyarp(ns_name, iface_name, address, count,
+ root_helper=None):
"""
Send a gratuitous arp using given namespace, interface, and address
"""
def arping_with_temporary_address():
# Configure the address on the interface
- device = IPDevice(iface_name, root_helper, namespace=ns_name)
+ device = IPDevice(iface_name, namespace=ns_name)
net = netaddr.IPNetwork(str(address))
device.addr.add(net.version, str(net), str(net.broadcast))
- _arping(ns_name, iface_name, address, count, root_helper)
+ _arping(ns_name, iface_name, address, count)
# Delete the address from the interface
device.addr.delete(net.version, str(net))
# License for the specific language governing permissions and limitations
# under the License.
-import os
-
import mock
import netaddr
self.execute = self.execute_p.start()
def test_execute_wrapper(self):
- ip_lib.SubProcessBase._execute('o', 'link', ('list',), 'sudo')
+ ip_lib.SubProcessBase._execute('o', 'link', ('list',),
+ run_as_root=True)
self.execute.assert_called_once_with(['ip', '-o', 'link', 'list'],
- root_helper='sudo',
+ run_as_root=True,
log_fail_as_error=True)
def test_execute_wrapper_int_options(self):
ip_lib.SubProcessBase._execute([4], 'link', ('list',))
self.execute.assert_called_once_with(['ip', '-4', 'link', 'list'],
- root_helper=None,
+ run_as_root=False,
log_fail_as_error=True)
def test_execute_wrapper_no_options(self):
ip_lib.SubProcessBase._execute([], 'link', ('list',))
self.execute.assert_called_once_with(['ip', 'link', 'list'],
- root_helper=None,
+ run_as_root=False,
log_fail_as_error=True)
def test_run_no_namespace(self):
base = ip_lib.SubProcessBase('sudo')
base._run([], 'link', ('list',))
self.execute.assert_called_once_with(['ip', 'link', 'list'],
- root_helper=None,
+ run_as_root=False,
log_fail_as_error=True)
def test_run_namespace(self):
base._run([], 'link', ('list',))
self.execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns',
'ip', 'link', 'list'],
- root_helper='sudo',
+ run_as_root=True,
log_fail_as_error=True)
def test_as_root_namespace(self):
base._as_root([], 'link', ('list',))
self.execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns',
'ip', 'link', 'list'],
- root_helper='sudo',
+ run_as_root=True,
log_fail_as_error=True)
- def test_enforce_root_helper_no_root_helper(self):
- base = ip_lib.SubProcessBase()
- not_root = 42
- with mock.patch.object(os, 'geteuid', return_value=not_root):
- self.assertRaises(exceptions.SudoRequired,
- base.enforce_root_helper)
-
- def test_enforce_root_helper_with_root_helper_supplied(self):
- base = ip_lib.SubProcessBase('sudo')
- try:
- base.enforce_root_helper()
- except exceptions.SudoRequired:
- self.fail('enforce_root_helper should not raise SudoRequired '
- 'when a root_helper is supplied.')
-
- def test_enforce_root_helper_with_no_root_helper_but_root(self):
- base = ip_lib.SubProcessBase()
- with mock.patch.object(os, 'geteuid', return_value=0):
- try:
- base.enforce_root_helper()
- except exceptions.SudoRequired:
- self.fail('enforce_root_helper should not require a root '
- 'helper when run as root.')
-
class TestIpWrapper(base.BaseTestCase):
def setUp(self):
ip_lib.IPDevice('bar@bar:bar')])
self.execute.assert_called_once_with(['o', 'd'], 'link', ('list',),
- 'sudo', None)
+ log_fail_as_error=True)
def test_get_devices_malformed_line(self):
self.execute.return_value = '\n'.join(LINK_SAMPLE + ['gibberish'])
ip_lib.IPDevice('bar@bar:bar')])
self.execute.assert_called_once_with(['o', 'd'], 'link', ('list',),
- 'sudo', None)
+ log_fail_as_error=True)
def test_get_namespaces(self):
self.execute.return_value = '\n'.join(NETNS_SAMPLE)
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
'cccccccc-cccc-cccc-cccc-cccccccccccc'])
- self.execute.assert_called_once_with('', 'netns', ('list',),
- root_helper='sudo')
+ self.execute.assert_called_once_with('', 'netns', ('list',))
def test_add_tuntap(self):
ip_lib.IPWrapper('sudo').add_tuntap('tap0')
self.execute.assert_called_once_with('', 'tuntap',
('add', 'tap0', 'mode', 'tap'),
- 'sudo', None,
+ run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_add_veth(self):
self.execute.assert_called_once_with('', 'link',
('add', 'tap0', 'type', 'veth',
'peer', 'name', 'tap1'),
- 'sudo', None,
+ run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_del_veth(self):
ip_lib.IPWrapper('sudo').del_veth('fpr-1234')
self.execute.assert_called_once_with('', 'link',
('del', 'fpr-1234'),
- 'sudo', None,
+ run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_add_veth_with_namespaces(self):
('add', 'tap0', 'type', 'veth',
'peer', 'name', 'tap1',
'netns', ns2),
- 'sudo', None,
+ run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_get_device(self):
dev = ip_lib.IPWrapper('sudo', 'ns').device('eth0')
- self.assertEqual(dev.root_helper, 'sudo')
self.assertEqual(dev.namespace, 'ns')
self.assertEqual(dev.name, 'eth0')
ns_exists.return_value = False
ip.ensure_namespace('ns')
self.execute.assert_has_calls(
- [mock.call([], 'netns', ('add', 'ns'), 'sudo', None,
+ [mock.call([], 'netns', ('add', 'ns'),
+ run_as_root=True, namespace=None,
log_fail_as_error=True)])
- ip_dev.assert_has_calls([mock.call('lo', 'sudo', 'ns'),
+ ip_dev.assert_has_calls([mock.call('lo', namespace='ns'),
mock.call().link.set_up()])
def test_ensure_namespace_existing(self):
'ttl', 'ttl0', 'tos', 'tos0',
'local', 'local0', 'proxy',
'port', '1', '2'],
- 'sudo', None,
+ run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_add_vxlan_invalid_port_length(self):
('add', 'from', ip,
'table', table,
'priority', priority),
- 'sudo', None,
+ run_as_root=True, namespace=None,
log_fail_as_error=True)
def _test_delete_rule(self, ip, table, priority):
self.execute.assert_called_once_with([ip_version], 'rule',
('del', 'table', table,
'priority', priority),
- 'sudo', None,
+ run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_add_rule_v4(self):
execute.assert_called_once_with(
['ip', 'netns', 'exec', 'ns',
'sysctl', '-w', 'net.ipv4.conf.all.promote_secondaries=1'],
- root_helper='sudo', check_exit_code=True, extra_ok_codes=None)
+ run_as_root=True, check_exit_code=True, extra_ok_codes=None)
def test_delete_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute'):
self.netns_cmd.delete('ns')
self._assert_sudo([], ('delete', 'ns'), force_root_namespace=True)
- def test_namespace_exists(self):
+ def test_namespace_exists_use_helper(self):
+ self.config(group='AGENT', use_helper_for_ns_read=True)
retval = '\n'.join(NETNS_SAMPLE)
# need another instance to avoid mocking
netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase())
self.assertTrue(
netns_cmd.exists('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'))
execute.assert_called_once_with(['ip', '-o', 'netns', 'list'],
- root_helper=None,
+ run_as_root=True,
log_fail_as_error=True)
- def test_namespace_doest_not_exist(self):
+ def test_namespace_doest_not_exist_no_helper(self):
+ self.config(group='AGENT', use_helper_for_ns_read=False)
retval = '\n'.join(NETNS_SAMPLE)
# need another instance to avoid mocking
netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase())
self.assertFalse(
netns_cmd.exists('bbbbbbbb-1111-2222-3333-bbbbbbbbbbbb'))
execute.assert_called_once_with(['ip', '-o', 'netns', 'list'],
- root_helper=None,
+ run_as_root=False,
log_fail_as_error=True)
def test_execute(self):
self.netns_cmd.execute(['ip', 'link', 'list'])
execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns', 'ip',
'link', 'list'],
- root_helper='sudo',
+ run_as_root=True,
check_exit_code=True,
extra_ok_codes=None)
['ip', 'netns', 'exec', 'ns', 'env'] +
['%s=%s' % (k, v) for k, v in env.items()] +
['ip', 'link', 'list'],
- root_helper='sudo', check_exit_code=True, extra_ok_codes=None)
+ run_as_root=True, check_exit_code=True, extra_ok_codes=None)
def test_execute_nosudo_with_no_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute') as execute:
self.parent.root_helper = None
self.netns_cmd.execute(['test'])
execute.assert_called_once_with(['test'],
- root_helper=None,
check_exit_code=True,
extra_ok_codes=None)
mock.sentinel.root_helper)
self.assertTrue(spawn_n.called)
- mIPWrapper.assert_called_once_with(mock.sentinel.root_helper,
- namespace=mock.sentinel.ns_name)
+ mIPWrapper.assert_called_once_with(namespace=mock.sentinel.ns_name)
ip_wrapper = mIPWrapper(mock.sentinel.root_helper,
mock.sentinel.ns_name)
# Check that the address was added to the interface before arping
def check_added_address(*args, **kwargs):
mIPDevice.assert_called_once_with(mock.sentinel.iface_name,
- mock.sentinel.root_helper,
namespace=mock.sentinel.ns_name)
device.addr.add.assert_called_once_with(4, addr + '/32', addr)
self.assertFalse(device.addr.delete.called)