From a8d5c3c6b7f59cec9f0ba8d7cc5d167cf15e5631 Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Wed, 21 Jan 2015 16:11:37 -0600 Subject: [PATCH] Stop using passed root_helper in ip_lib This patch leaves the root_helper option, but stops using it as a phased implementation of dropping the argument altogether. Partially-Implements: blueprint rootwrap-daemon-mode Change-Id: I1b729241aa76b2dcb053b51c69d28e1c5359b4f7 --- neutron/agent/linux/ip_lib.py | 99 ++++++++----------- neutron/common/exceptions.py | 4 - neutron/plugins/sriovnicagent/pci_lib.py | 9 +- .../tests/unit/test_linux_external_process.py | 1 - neutron/tests/unit/test_linux_ip_lib.py | 89 ++++++----------- 5 files changed, 75 insertions(+), 127 deletions(-) diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index c10c60a00..122bc20f1 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -14,7 +14,6 @@ # under the License. import eventlet -import os import netaddr from oslo_config import cfg @@ -44,7 +43,6 @@ VLAN_INTERFACE_DETAIL = ['vlan protocol 802.1q', class SubProcessBase(object): def __init__(self, root_helper=None, namespace=None, log_fail_as_error=True): - self.root_helper = root_helper self.namespace = namespace self.log_fail_as_error = log_fail_as_error try: @@ -60,38 +58,29 @@ class SubProcessBase(object): elif self.force_root: # Force use of the root helper to ensure that commands # will execute in dom0 when running under XenServer/XCP. - return self._execute(options, command, args, self.root_helper, + return self._execute(options, command, args, run_as_root=True, log_fail_as_error=self.log_fail_as_error) else: return self._execute(options, command, args, log_fail_as_error=self.log_fail_as_error) - def enforce_root_helper(self): - if not self.root_helper and os.geteuid() != 0: - raise exceptions.SudoRequired() - def _as_root(self, options, command, args, use_root_namespace=False): - self.enforce_root_helper() - namespace = self.namespace if not use_root_namespace else None - return self._execute(options, - command, - args, - self.root_helper, - namespace, + return self._execute(options, command, args, run_as_root=True, + namespace=namespace, log_fail_as_error=self.log_fail_as_error) @classmethod - def _execute(cls, options, command, args, root_helper=None, + def _execute(cls, options, command, args, run_as_root=False, namespace=None, log_fail_as_error=True): opt_list = ['-%s' % o for o in options] if namespace: ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip'] else: ip_cmd = ['ip'] - return utils.execute(ip_cmd + opt_list + [command] + list(args), - root_helper=root_helper, + cmd = ip_cmd + opt_list + [command] + list(args) + return utils.execute(cmd, run_as_root=run_as_root, log_fail_as_error=log_fail_as_error) def set_log_fail_as_error(self, fail_with_error): @@ -100,17 +89,15 @@ class SubProcessBase(object): class IPWrapper(SubProcessBase): def __init__(self, root_helper=None, namespace=None): - super(IPWrapper, self).__init__(root_helper=root_helper, - namespace=namespace) + super(IPWrapper, self).__init__(namespace=namespace) self.netns = IpNetnsCommand(self) def device(self, name): - return IPDevice(name, self.root_helper, self.namespace) + return IPDevice(name, namespace=self.namespace) def get_devices(self, exclude_loopback=False): retval = [] - output = self._execute(['o', 'd'], 'link', ('list',), - self.root_helper, self.namespace) + output = self._run(['o', 'd'], 'link', ('list',)) for line in output.split('\n'): if '<' not in line: continue @@ -125,14 +112,12 @@ class IPWrapper(SubProcessBase): if exclude_loopback and name == LOOPBACK_DEVNAME: continue - retval.append(IPDevice(name, - self.root_helper, - self.namespace)) + retval.append(IPDevice(name, namespace=self.namespace)) return retval def add_tuntap(self, name, mode='tap'): self._as_root('', 'tuntap', ('add', name, 'mode', mode)) - return IPDevice(name, self.root_helper, self.namespace) + return IPDevice(name, namespace=self.namespace) def add_veth(self, name1, name2, namespace2=None): args = ['add', name1, 'type', 'veth', 'peer', 'name', name2] @@ -145,8 +130,8 @@ class IPWrapper(SubProcessBase): self._as_root('', 'link', tuple(args)) - return (IPDevice(name1, self.root_helper, self.namespace), - IPDevice(name2, self.root_helper, namespace2)) + return (IPDevice(name1, namespace=self.namespace), + IPDevice(name2, namespace=namespace2)) def del_veth(self, name): """Delete a virtual interface between two namespaces.""" @@ -158,7 +143,7 @@ class IPWrapper(SubProcessBase): lo = ip.device(LOOPBACK_DEVNAME) lo.link.set_up() else: - ip = IPWrapper(self.root_helper, name) + ip = IPWrapper(namespace=name) return ip def namespace_is_empty(self): @@ -197,11 +182,11 @@ class IPWrapper(SubProcessBase): elif port: raise exceptions.NetworkVxlanPortRangeError(vxlan_range=port) self._as_root('', 'link', cmd) - return (IPDevice(name, self.root_helper, self.namespace)) + return (IPDevice(name, namespace=self.namespace)) @classmethod - def get_namespaces(cls, root_helper): - output = cls._execute('', 'netns', ('list',), root_helper=root_helper) + def get_namespaces(cls, root_helper=None): + output = cls._execute('', 'netns', ('list',)) return [l.strip() for l in output.split('\n')] @@ -221,8 +206,7 @@ class IpRule(IPWrapper): class IPDevice(SubProcessBase): def __init__(self, name, root_helper=None, namespace=None): - super(IPDevice, self).__init__(root_helper=root_helper, - namespace=namespace) + super(IPDevice, self).__init__(namespace=namespace) self.name = name self.link = IpLinkCommand(self) self.addr = IpAddrCommand(self) @@ -535,7 +519,7 @@ class IpNetnsCommand(IpCommandBase): def add(self, name): self._as_root('add', name, use_root_namespace=True) - wrapper = IPWrapper(self._parent.root_helper, name) + wrapper = IPWrapper(namespace=name) wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.conf.all.promote_secondaries=1']) return wrapper @@ -546,25 +530,23 @@ class IpNetnsCommand(IpCommandBase): def execute(self, cmds, addl_env=None, check_exit_code=True, extra_ok_codes=None): ns_params = [] + kwargs = {} if self._parent.namespace: - self._parent.enforce_root_helper() + kwargs['run_as_root'] = True ns_params = ['ip', 'netns', 'exec', self._parent.namespace] env_params = [] if addl_env: env_params = (['env'] + ['%s=%s' % pair for pair in addl_env.items()]) - return utils.execute( - ns_params + env_params + list(cmds), - root_helper=self._parent.root_helper, - check_exit_code=check_exit_code, extra_ok_codes=extra_ok_codes) + cmd = ns_params + env_params + list(cmds) + return utils.execute(cmd, check_exit_code=check_exit_code, + extra_ok_codes=extra_ok_codes, **kwargs) def exists(self, name): - root_helper = self._parent.root_helper - if not cfg.CONF.AGENT.use_helper_for_ns_read: - root_helper = None - output = self._parent._execute('o', 'netns', ['list'], - root_helper=root_helper) + output = self._parent._execute( + 'o', 'netns', ['list'], + run_as_root=cfg.CONF.AGENT.use_helper_for_ns_read) for line in output.split('\n'): if name == line.strip(): return True @@ -574,7 +556,7 @@ class IpNetnsCommand(IpCommandBase): def device_exists(device_name, root_helper=None, namespace=None): """Return True if the device exists in the namespace.""" try: - dev = IPDevice(device_name, root_helper, namespace) + dev = IPDevice(device_name, namespace=namespace) dev.set_log_fail_as_error(False) address = dev.link.address except RuntimeError: @@ -588,7 +570,7 @@ def device_exists_with_ip_mac(device_name, ip_cidr, mac, namespace=None, exists in the namespace. """ try: - device = IPDevice(device_name, root_helper, namespace) + device = IPDevice(device_name, namespace=namespace) if mac != device.link.address: return False if ip_cidr not in (ip['cidr'] for ip in device.addr.list()): @@ -607,7 +589,7 @@ def get_routing_table(root_helper=None, namespace=None): 'device': device_name} """ - ip_wrapper = IPWrapper(root_helper, namespace=namespace) + ip_wrapper = IPWrapper(namespace=namespace) table = ip_wrapper.netns.execute(['ip', 'route'], check_exit_code=True) routes = [] @@ -630,7 +612,7 @@ def get_routing_table(root_helper=None, namespace=None): def ensure_device_is_ready(device_name, root_helper=None, namespace=None): - dev = IPDevice(device_name, root_helper, namespace) + dev = IPDevice(device_name, namespace=namespace) dev.set_log_fail_as_error(False) try: # Ensure the device is up, even if it is already up. If the device @@ -643,15 +625,15 @@ def ensure_device_is_ready(device_name, root_helper=None, namespace=None): def iproute_arg_supported(command, arg, root_helper=None): command += ['help'] - stdout, stderr = utils.execute(command, root_helper=root_helper, - check_exit_code=False, return_stderr=True) + stdout, stderr = utils.execute(command, check_exit_code=False, + return_stderr=True) return any(arg in line for line in stderr.split('\n')) -def _arping(ns_name, iface_name, address, count, root_helper): +def _arping(ns_name, iface_name, address, count): arping_cmd = ['arping', '-A', '-I', iface_name, '-c', count, address] try: - ip_wrapper = IPWrapper(root_helper, namespace=ns_name) + ip_wrapper = IPWrapper(namespace=ns_name) ip_wrapper.netns.execute(arping_cmd, check_exit_code=True) except Exception: msg = _LE("Failed sending gratuitous ARP " @@ -661,17 +643,18 @@ def _arping(ns_name, iface_name, address, count, root_helper): 'ns': ns_name}) -def send_gratuitous_arp(ns_name, iface_name, address, count, root_helper): +def send_gratuitous_arp(ns_name, iface_name, address, count, root_helper=None): """Send a gratuitous arp using given namespace, interface, and address""" def arping(): - _arping(ns_name, iface_name, address, count, root_helper) + _arping(ns_name, iface_name, address, count) if count > 0: eventlet.spawn_n(arping) -def send_garp_for_proxyarp(ns_name, iface_name, address, count, root_helper): +def send_garp_for_proxyarp(ns_name, iface_name, address, count, + root_helper=None): """ Send a gratuitous arp using given namespace, interface, and address @@ -682,11 +665,11 @@ def send_garp_for_proxyarp(ns_name, iface_name, address, count, root_helper): """ def arping_with_temporary_address(): # Configure the address on the interface - device = IPDevice(iface_name, root_helper, namespace=ns_name) + device = IPDevice(iface_name, namespace=ns_name) net = netaddr.IPNetwork(str(address)) device.addr.add(net.version, str(net), str(net.broadcast)) - _arping(ns_name, iface_name, address, count, root_helper) + _arping(ns_name, iface_name, address, count) # Delete the address from the interface device.addr.delete(net.version, str(net)) diff --git a/neutron/common/exceptions.py b/neutron/common/exceptions.py index 35beac9a6..03f4e0777 100644 --- a/neutron/common/exceptions.py +++ b/neutron/common/exceptions.py @@ -240,10 +240,6 @@ class PreexistingDeviceFailure(NeutronException): message = _("Creation failed. %(dev_name)s already exists.") -class SudoRequired(NeutronException): - message = _("Sudo privilege is required to run this command.") - - class QuotaResourceUnknown(NotFound): message = _("Unknown quota resources %(unknown)s.") diff --git a/neutron/plugins/sriovnicagent/pci_lib.py b/neutron/plugins/sriovnicagent/pci_lib.py index f1f6a35ab..0a818d924 100644 --- a/neutron/plugins/sriovnicagent/pci_lib.py +++ b/neutron/plugins/sriovnicagent/pci_lib.py @@ -51,8 +51,7 @@ class PciDeviceIPWrapper(ip_lib.IPWrapper): @return: list of assigned mac addresses """ try: - out = self._execute('', "link", ("show", self.dev_name), - self.root_helper) + out = self._execute('', "link", ("show", self.dev_name)) except Exception as e: LOG.exception(_LE("Failed executing ip command")) raise exc.IpCommandError(dev_name=self.dev_name, @@ -73,8 +72,7 @@ class PciDeviceIPWrapper(ip_lib.IPWrapper): @todo: Handle "auto" state """ try: - out = self._execute('', "link", ("show", self.dev_name), - self.root_helper) + out = self._execute('', "link", ("show", self.dev_name)) except Exception as e: LOG.exception(_LE("Failed executing ip command")) raise exc.IpCommandError(dev_name=self.dev_name, @@ -100,8 +98,7 @@ class PciDeviceIPWrapper(ip_lib.IPWrapper): try: self._execute('', "link", ("set", self.dev_name, "vf", - str(vf_index), "state", status_str), - self.root_helper) + str(vf_index), "state", status_str)) except Exception as e: LOG.exception(_LE("Failed executing ip command")) raise exc.IpCommandError(dev_name=self.dev_name, diff --git a/neutron/tests/unit/test_linux_external_process.py b/neutron/tests/unit/test_linux_external_process.py index f208c362b..25fb25292 100644 --- a/neutron/tests/unit/test_linux_external_process.py +++ b/neutron/tests/unit/test_linux_external_process.py @@ -49,7 +49,6 @@ class TestProcessManager(base.BaseTestCase): manager.enable(callback) callback.assert_called_once_with('pidfile') self.execute.assert_called_once_with(['the', 'cmd'], - root_helper='sudo', check_exit_code=True, extra_ok_codes=None) diff --git a/neutron/tests/unit/test_linux_ip_lib.py b/neutron/tests/unit/test_linux_ip_lib.py index 840d3b773..6f7254d05 100644 --- a/neutron/tests/unit/test_linux_ip_lib.py +++ b/neutron/tests/unit/test_linux_ip_lib.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import os - import mock import netaddr @@ -151,31 +149,32 @@ class TestSubProcessBase(base.BaseTestCase): self.execute = self.execute_p.start() def test_execute_wrapper(self): - ip_lib.SubProcessBase._execute('o', 'link', ('list',), 'sudo') + ip_lib.SubProcessBase._execute('o', 'link', ('list',), + run_as_root=True) self.execute.assert_called_once_with(['ip', '-o', 'link', 'list'], - root_helper='sudo', + run_as_root=True, log_fail_as_error=True) def test_execute_wrapper_int_options(self): ip_lib.SubProcessBase._execute([4], 'link', ('list',)) self.execute.assert_called_once_with(['ip', '-4', 'link', 'list'], - root_helper=None, + run_as_root=False, log_fail_as_error=True) def test_execute_wrapper_no_options(self): ip_lib.SubProcessBase._execute([], 'link', ('list',)) self.execute.assert_called_once_with(['ip', 'link', 'list'], - root_helper=None, + run_as_root=False, log_fail_as_error=True) def test_run_no_namespace(self): base = ip_lib.SubProcessBase('sudo') base._run([], 'link', ('list',)) self.execute.assert_called_once_with(['ip', 'link', 'list'], - root_helper=None, + run_as_root=False, log_fail_as_error=True) def test_run_namespace(self): @@ -183,7 +182,7 @@ class TestSubProcessBase(base.BaseTestCase): base._run([], 'link', ('list',)) self.execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns', 'ip', 'link', 'list'], - root_helper='sudo', + run_as_root=True, log_fail_as_error=True) def test_as_root_namespace(self): @@ -191,33 +190,9 @@ class TestSubProcessBase(base.BaseTestCase): base._as_root([], 'link', ('list',)) self.execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns', 'ip', 'link', 'list'], - root_helper='sudo', + run_as_root=True, log_fail_as_error=True) - def test_enforce_root_helper_no_root_helper(self): - base = ip_lib.SubProcessBase() - not_root = 42 - with mock.patch.object(os, 'geteuid', return_value=not_root): - self.assertRaises(exceptions.SudoRequired, - base.enforce_root_helper) - - def test_enforce_root_helper_with_root_helper_supplied(self): - base = ip_lib.SubProcessBase('sudo') - try: - base.enforce_root_helper() - except exceptions.SudoRequired: - self.fail('enforce_root_helper should not raise SudoRequired ' - 'when a root_helper is supplied.') - - def test_enforce_root_helper_with_no_root_helper_but_root(self): - base = ip_lib.SubProcessBase() - with mock.patch.object(os, 'geteuid', return_value=0): - try: - base.enforce_root_helper() - except exceptions.SudoRequired: - self.fail('enforce_root_helper should not require a root ' - 'helper when run as root.') - class TestIpWrapper(base.BaseTestCase): def setUp(self): @@ -245,7 +220,7 @@ class TestIpWrapper(base.BaseTestCase): ip_lib.IPDevice('bar@bar:bar')]) self.execute.assert_called_once_with(['o', 'd'], 'link', ('list',), - 'sudo', None) + log_fail_as_error=True) def test_get_devices_malformed_line(self): self.execute.return_value = '\n'.join(LINK_SAMPLE + ['gibberish']) @@ -267,7 +242,7 @@ class TestIpWrapper(base.BaseTestCase): ip_lib.IPDevice('bar@bar:bar')]) self.execute.assert_called_once_with(['o', 'd'], 'link', ('list',), - 'sudo', None) + log_fail_as_error=True) def test_get_namespaces(self): self.execute.return_value = '\n'.join(NETNS_SAMPLE) @@ -277,14 +252,13 @@ class TestIpWrapper(base.BaseTestCase): 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', 'cccccccc-cccc-cccc-cccc-cccccccccccc']) - self.execute.assert_called_once_with('', 'netns', ('list',), - root_helper='sudo') + self.execute.assert_called_once_with('', 'netns', ('list',)) def test_add_tuntap(self): ip_lib.IPWrapper('sudo').add_tuntap('tap0') self.execute.assert_called_once_with('', 'tuntap', ('add', 'tap0', 'mode', 'tap'), - 'sudo', None, + run_as_root=True, namespace=None, log_fail_as_error=True) def test_add_veth(self): @@ -292,14 +266,14 @@ class TestIpWrapper(base.BaseTestCase): self.execute.assert_called_once_with('', 'link', ('add', 'tap0', 'type', 'veth', 'peer', 'name', 'tap1'), - 'sudo', None, + run_as_root=True, namespace=None, log_fail_as_error=True) def test_del_veth(self): ip_lib.IPWrapper('sudo').del_veth('fpr-1234') self.execute.assert_called_once_with('', 'link', ('del', 'fpr-1234'), - 'sudo', None, + run_as_root=True, namespace=None, log_fail_as_error=True) def test_add_veth_with_namespaces(self): @@ -311,12 +285,11 @@ class TestIpWrapper(base.BaseTestCase): ('add', 'tap0', 'type', 'veth', 'peer', 'name', 'tap1', 'netns', ns2), - 'sudo', None, + run_as_root=True, namespace=None, log_fail_as_error=True) def test_get_device(self): dev = ip_lib.IPWrapper('sudo', 'ns').device('eth0') - self.assertEqual(dev.root_helper, 'sudo') self.assertEqual(dev.namespace, 'ns') self.assertEqual(dev.name, 'eth0') @@ -328,9 +301,10 @@ class TestIpWrapper(base.BaseTestCase): ns_exists.return_value = False ip.ensure_namespace('ns') self.execute.assert_has_calls( - [mock.call([], 'netns', ('add', 'ns'), 'sudo', None, + [mock.call([], 'netns', ('add', 'ns'), + run_as_root=True, namespace=None, log_fail_as_error=True)]) - ip_dev.assert_has_calls([mock.call('lo', 'sudo', 'ns'), + ip_dev.assert_has_calls([mock.call('lo', namespace='ns'), mock.call().link.set_up()]) def test_ensure_namespace_existing(self): @@ -422,7 +396,7 @@ class TestIpWrapper(base.BaseTestCase): 'ttl', 'ttl0', 'tos', 'tos0', 'local', 'local0', 'proxy', 'port', '1', '2'], - 'sudo', None, + run_as_root=True, namespace=None, log_fail_as_error=True) def test_add_vxlan_invalid_port_length(self): @@ -457,7 +431,7 @@ class TestIpRule(base.BaseTestCase): ('add', 'from', ip, 'table', table, 'priority', priority), - 'sudo', None, + run_as_root=True, namespace=None, log_fail_as_error=True) def _test_delete_rule(self, ip, table, priority): @@ -466,7 +440,7 @@ class TestIpRule(base.BaseTestCase): self.execute.assert_called_once_with([ip_version], 'rule', ('del', 'table', table, 'priority', priority), - 'sudo', None, + run_as_root=True, namespace=None, log_fail_as_error=True) def test_add_rule_v4(self): @@ -829,14 +803,15 @@ class TestIpNetnsCommand(TestIPCmdBase): execute.assert_called_once_with( ['ip', 'netns', 'exec', 'ns', 'sysctl', '-w', 'net.ipv4.conf.all.promote_secondaries=1'], - root_helper='sudo', check_exit_code=True, extra_ok_codes=None) + run_as_root=True, check_exit_code=True, extra_ok_codes=None) def test_delete_namespace(self): with mock.patch('neutron.agent.linux.utils.execute'): self.netns_cmd.delete('ns') self._assert_sudo([], ('delete', 'ns'), force_root_namespace=True) - def test_namespace_exists(self): + def test_namespace_exists_use_helper(self): + self.config(group='AGENT', use_helper_for_ns_read=True) retval = '\n'.join(NETNS_SAMPLE) # need another instance to avoid mocking netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase()) @@ -845,10 +820,11 @@ class TestIpNetnsCommand(TestIPCmdBase): self.assertTrue( netns_cmd.exists('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb')) execute.assert_called_once_with(['ip', '-o', 'netns', 'list'], - root_helper=None, + run_as_root=True, log_fail_as_error=True) - def test_namespace_doest_not_exist(self): + def test_namespace_doest_not_exist_no_helper(self): + self.config(group='AGENT', use_helper_for_ns_read=False) retval = '\n'.join(NETNS_SAMPLE) # need another instance to avoid mocking netns_cmd = ip_lib.IpNetnsCommand(ip_lib.SubProcessBase()) @@ -857,7 +833,7 @@ class TestIpNetnsCommand(TestIPCmdBase): self.assertFalse( netns_cmd.exists('bbbbbbbb-1111-2222-3333-bbbbbbbbbbbb')) execute.assert_called_once_with(['ip', '-o', 'netns', 'list'], - root_helper=None, + run_as_root=False, log_fail_as_error=True) def test_execute(self): @@ -866,7 +842,7 @@ class TestIpNetnsCommand(TestIPCmdBase): self.netns_cmd.execute(['ip', 'link', 'list']) execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns', 'ip', 'link', 'list'], - root_helper='sudo', + run_as_root=True, check_exit_code=True, extra_ok_codes=None) @@ -879,7 +855,7 @@ class TestIpNetnsCommand(TestIPCmdBase): ['ip', 'netns', 'exec', 'ns', 'env'] + ['%s=%s' % (k, v) for k, v in env.items()] + ['ip', 'link', 'list'], - root_helper='sudo', check_exit_code=True, extra_ok_codes=None) + run_as_root=True, check_exit_code=True, extra_ok_codes=None) def test_execute_nosudo_with_no_namespace(self): with mock.patch('neutron.agent.linux.utils.execute') as execute: @@ -887,7 +863,6 @@ class TestIpNetnsCommand(TestIPCmdBase): self.parent.root_helper = None self.netns_cmd.execute(['test']) execute.assert_called_once_with(['test'], - root_helper=None, check_exit_code=True, extra_ok_codes=None) @@ -946,8 +921,7 @@ class TestArpPing(TestIPCmdBase): mock.sentinel.root_helper) self.assertTrue(spawn_n.called) - mIPWrapper.assert_called_once_with(mock.sentinel.root_helper, - namespace=mock.sentinel.ns_name) + mIPWrapper.assert_called_once_with(namespace=mock.sentinel.ns_name) ip_wrapper = mIPWrapper(mock.sentinel.root_helper, mock.sentinel.ns_name) @@ -982,7 +956,6 @@ class TestArpPing(TestIPCmdBase): # Check that the address was added to the interface before arping def check_added_address(*args, **kwargs): mIPDevice.assert_called_once_with(mock.sentinel.iface_name, - mock.sentinel.root_helper, namespace=mock.sentinel.ns_name) device.addr.add.assert_called_once_with(4, addr + '/32', addr) self.assertFalse(device.addr.delete.called) -- 2.45.2