agent_conf,
interface_driver,
use_ipv6=False,
- root_helper=None,
ns_name=None):
self.router_id = router_id
self.ex_gw_port = None
config.register_interface_driver_opts_helper(conf)
config.register_use_namespaces_opts_helper(conf)
config.register_agent_state_opts_helper(conf)
- config.register_root_helper(conf)
conf.register_opts(interface.OPTS)
conf.register_opts(external_process.OPTS)
class SubProcessBase(object):
- def __init__(self, root_helper=None, namespace=None,
+ def __init__(self, namespace=None,
log_fail_as_error=True):
self.namespace = namespace
self.log_fail_as_error = log_fail_as_error
class IPWrapper(SubProcessBase):
- def __init__(self, root_helper=None, namespace=None):
+ def __init__(self, namespace=None):
super(IPWrapper, self).__init__(namespace=namespace)
self.netns = IpNetnsCommand(self)
return (IPDevice(name, namespace=self.namespace))
@classmethod
- def get_namespaces(cls, root_helper=None):
+ def get_namespaces(cls):
output = cls._execute('', 'netns', ('list',))
return [l.strip() for l in output.split('\n')]
class IPDevice(SubProcessBase):
- def __init__(self, name, root_helper=None, namespace=None):
+ def __init__(self, name, namespace=None):
super(IPDevice, self).__init__(namespace=namespace)
self.name = name
self.link = IpLinkCommand(self)
return False
-def device_exists(device_name, root_helper=None, namespace=None):
+def device_exists(device_name, namespace=None):
"""Return True if the device exists in the namespace."""
try:
dev = IPDevice(device_name, namespace=namespace)
return bool(address)
-def device_exists_with_ip_mac(device_name, ip_cidr, mac, namespace=None,
- root_helper=None):
+def device_exists_with_ip_mac(device_name, ip_cidr, mac, namespace=None):
"""Return True if the device with the given IP and MAC addresses
exists in the namespace.
"""
return True
-def get_routing_table(root_helper=None, namespace=None):
+def get_routing_table(namespace=None):
"""Return a list of dictionaries, each representing a route.
The dictionary format is: {'destination': cidr,
return routes
-def ensure_device_is_ready(device_name, root_helper=None, namespace=None):
+def ensure_device_is_ready(device_name, namespace=None):
dev = IPDevice(device_name, namespace=namespace)
dev.set_log_fail_as_error(False)
try:
return True
-def iproute_arg_supported(command, arg, root_helper=None):
+def iproute_arg_supported(command, arg):
command += ['help']
stdout, stderr = utils.execute(command, check_exit_code=False,
return_stderr=True)
'ns': ns_name})
-def send_gratuitous_arp(ns_name, iface_name, address, count, root_helper=None):
+def send_gratuitous_arp(ns_name, iface_name, address, count):
"""Send a gratuitous arp using given namespace, interface, and address"""
def arping():
eventlet.spawn_n(arping)
-def send_garp_for_proxyarp(ns_name, iface_name, address, count,
- root_helper=None):
+def send_garp_for_proxyarp(ns_name, iface_name, address, count):
"""
Send a gratuitous arp using given namespace, interface, and address
return obj, cmd
-# TODO(twilson) Remove root_helper argument in favor of run_as_root
-# root_helper= is only intended to handle old-style code that passes the
-# root_helper option exclusively, the run_as_root option is for new-style code
-# that leaves root_helper handling to the execute() function. The two options
-# should not be used in conjunction.
-def execute(cmd, root_helper=None, process_input=None, addl_env=None,
+def execute(cmd, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False, log_fail_as_error=True,
extra_ok_codes=None, run_as_root=False):
- if root_helper:
- run_as_root = True
try:
obj, cmd = create_process(cmd, run_as_root=run_as_root,
addl_env=addl_env)
conf.register_cli_opts(cli_opts)
agent_config.register_interface_driver_opts_helper(conf)
agent_config.register_use_namespaces_opts_helper(conf)
- agent_config.register_root_helper(conf)
conf.register_opts(dhcp_config.DHCP_AGENT_OPTS)
conf.register_opts(dhcp_config.DHCP_OPTS)
conf.register_opts(dhcp_config.DNSMASQ_OPTS)
if not re.match(NS_MANGLING_PATTERN, namespace):
return False
- root_helper = agent_config.get_root_helper(conf)
- ip = ip_lib.IPWrapper(root_helper, namespace)
+ ip = ip_lib.IPWrapper(namespace=namespace)
return force or ip.namespace_is_empty()
"""
try:
- root_helper = agent_config.get_root_helper(conf)
- ip = ip_lib.IPWrapper(root_helper, namespace)
+ ip = ip_lib.IPWrapper(namespace=namespace)
if force:
kill_dhcp(conf, namespace)
conf()
config.setup_logging()
- root_helper = agent_config.get_root_helper(conf)
# Identify namespaces that are candidates for deletion.
candidates = [ns for ns in
- ip_lib.IPWrapper.get_namespaces(root_helper)
+ ip_lib.IPWrapper.get_namespaces()
if eligible_for_deletion(conf, ns, conf.force)]
if candidates:
import netaddr
from oslo_config import cfg
-from neutron.agent.common import config
from neutron.agent.linux import dhcp
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
def __init__(self, conf, client, driver):
self.conf = conf
- self.root_helper = config.get_root_helper(conf)
self.client = client
self.driver = driver
if self.conf.use_namespaces:
namespace = self._get_namespace(port)
- if ip_lib.device_exists(interface_name, self.root_helper, namespace):
+ if ip_lib.device_exists(interface_name, namespace=namespace):
LOG.debug('Reusing existing device: %s.', interface_name)
else:
self.driver.plug(network.id,
bridge = None
if network.external:
bridge = self.conf.external_network_bridge
- ip = ip_lib.IPWrapper(self.root_helper)
+ ip = ip_lib.IPWrapper()
namespace = self._get_namespace(port)
if self.conf.use_namespaces and ip.netns.exists(namespace):
self.driver.unplug(self.driver.get_device_name(port),
def exec_command(self, port_id, command=None):
port = dhcp.DictModel(self.client.show_port(port_id)['port'])
- ip = ip_lib.IPWrapper(self.root_helper)
+ ip = ip_lib.IPWrapper()
namespace = self._get_namespace(port)
if self.conf.use_namespaces:
if not command:
cfg.CONF.register_opts(debug_agent.NeutronDebugAgent.OPTS)
config.register_interface_driver_opts_helper(cfg.CONF)
config.register_use_namespaces_opts_helper(cfg.CONF)
- config.register_root_helper(cfg.CONF)
cfg.CONF(['--config-file', self.options.config_file])
config.setup_logging()
driver = importutils.import_object(cfg.CONF.interface_driver, cfg.CONF)
def __init__(self, host, conf=None):
self.conf = conf or cfg.CONF
self._load_drivers()
- self.root_helper = config.get_root_helper(self.conf)
self.context = context.get_admin_context_without_session()
self.metering_info = {}
self.metering_loop = loopingcall.FixedIntervalLoopingCall(
conf = cfg.CONF
conf.register_opts(MeteringAgent.Opts)
config.register_agent_state_opts_helper(conf)
- config.register_root_helper(conf)
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
config.register_interface_driver_opts_helper(cfg.CONF)
config.register_use_namespaces_opts_helper(cfg.CONF)
-config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(interface.OPTS)
self.conf = conf
self.id = router['id']
self.router = router
- self.root_helper = config.get_root_helper(self.conf)
self.ns_name = NS_PREFIX + self.id if conf.use_namespaces else None
self.iptables_manager = iptables_manager.IptablesManager(
namespace=self.ns_name,
# under the License.
import netaddr
-from oslo_config import cfg
import testscenarios
-from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
def setUp(self):
super(BaseLinuxTestCase, self).setUp()
- config.register_root_helper(cfg.CONF)
def check_command(self, cmd, error_text, skip_msg, run_as_root=False):
try:
raise
def _create_namespace(self):
- ip_cmd = ip_lib.IPWrapper(self.root_helper)
+ ip_cmd = ip_lib.IPWrapper()
name = "func-%s" % uuidutils.generate_uuid()
namespace = ip_cmd.ensure_namespace(name)
self.addCleanup(namespace.netns.delete, namespace.namespace)
continue
def create_veth(self):
- ip_wrapper = ip_lib.IPWrapper(self.root_helper)
+ ip_wrapper = ip_lib.IPWrapper()
name1 = get_rand_veth_name()
name2 = get_rand_veth_name()
self.addCleanup(ip_wrapper.del_veth, name1)
super(BaseOVSLinuxTestCase, self).setUp()
self.config(group='OVS', ovsdb_interface=self.ovsdb_interface)
self.ovs = ovs_lib.BaseOVS()
- self.ip = ip_lib.IPWrapper(self.root_helper)
+ self.ip = ip_lib.IPWrapper()
def create_ovs_bridge(self, br_prefix=BR_PREFIX):
br = self.create_resource(br_prefix, self.ovs.add_bridge)
:param attr: A Device namedtuple
:return: A tuntap ip_lib.IPDevice
"""
- ip = ip_lib.IPWrapper(self.root_helper, namespace=attr.namespace)
+ ip = ip_lib.IPWrapper(namespace=attr.namespace)
ip.netns.add(attr.namespace)
self.addCleanup(ip.netns.delete, attr.namespace)
tap_device = ip.add_tuntap(attr.name)
attr = self.generate_device_details()
self.assertFalse(
- ip_lib.device_exists(attr.name, self.root_helper,
- attr.namespace))
+ ip_lib.device_exists(attr.name, namespace=attr.namespace))
device = self.manage_device(attr)
self.assertTrue(
- ip_lib.device_exists(device.name, self.root_helper,
- attr.namespace))
+ ip_lib.device_exists(device.name, namespace=attr.namespace))
device.link.delete()
self.assertFalse(
- ip_lib.device_exists(attr.name, self.root_helper,
- attr.namespace))
+ ip_lib.device_exists(attr.name, namespace=attr.namespace))
def test_device_exists_with_ip_mac(self):
attr = self.generate_device_details()
device = self.manage_device(attr)
self.assertTrue(
- ip_lib.device_exists_with_ip_mac(
- *attr, root_helper=self.root_helper))
+ ip_lib.device_exists_with_ip_mac(*attr))
wrong_ip_cidr = '10.0.0.1/8'
wrong_mac_address = 'aa:aa:aa:aa:aa:aa'
attr = self.generate_device_details(name='wrong_name')
self.assertFalse(
- ip_lib.device_exists_with_ip_mac(
- *attr, root_helper=self.root_helper))
+ ip_lib.device_exists_with_ip_mac(*attr))
attr = self.generate_device_details(ip_cidr=wrong_ip_cidr)
- self.assertFalse(
- ip_lib.device_exists_with_ip_mac(
- *attr, root_helper=self.root_helper))
+ self.assertFalse(ip_lib.device_exists_with_ip_mac(*attr))
attr = self.generate_device_details(mac_address=wrong_mac_address)
- self.assertFalse(
- ip_lib.device_exists_with_ip_mac(
- *attr, root_helper=self.root_helper))
+ self.assertFalse(ip_lib.device_exists_with_ip_mac(*attr))
attr = self.generate_device_details(namespace='wrong_namespace')
- self.assertFalse(
- ip_lib.device_exists_with_ip_mac(
- *attr, root_helper=self.root_helper))
+ self.assertFalse(ip_lib.device_exists_with_ip_mac(*attr))
device.link.delete()
'destination': str(
netaddr.IPNetwork(attr.ip_cidr).cidr)}]
- routes = ip_lib.get_routing_table(self.root_helper, attr.namespace)
+ routes = ip_lib.get_routing_table(namespace=attr.namespace)
self.assertEqual(expected_routes, routes)
"""
import eventlet
+from oslo_config import cfg
from neutron.agent.linux import ovsdb_monitor
from neutron.agent.linux import utils
def setUp(self):
super(BaseMonitorTest, self).setUp()
- rootwrap_not_configured = (self.root_helper ==
+ rootwrap_not_configured = (cfg.CONF.AGENT.root_helper ==
functional_base.SUDO_CMD)
if rootwrap_not_configured:
# The monitor tests require a nested invocation that has
# to be emulated by double sudo if rootwrap is not
# configured.
- self.root_helper = '%s %s' % (self.root_helper, self.root_helper)
- self.config(group='AGENT', root_helper=self.root_helper)
+ self.config(group='AGENT',
+ root_helper=" ".join([functional_base.SUDO_CMD] * 2))
self._check_test_requirements()
self.bridge = self.create_ovs_bridge()
'interface_driver',
'neutron.agent.linux.interface.OVSInterfaceDriver')
conf.set_override('router_delete_namespaces', True)
- conf.set_override('root_helper', self.root_helper, group='AGENT')
br_int = self.create_ovs_bridge()
br_ex = self.create_ovs_bridge()
router.router[l3_constants.FLOATINGIP_KEY].append(fip)
def _namespace_exists(self, namespace):
- ip = ip_lib.IPWrapper(self.root_helper, namespace)
+ ip = ip_lib.IPWrapper(namespace=namespace)
return ip.netns.exists(namespace)
def _metadata_proxy_exists(self, conf, router):
def device_exists_with_ip_mac(self, expected_device, name_getter,
namespace):
return ip_lib.device_exists_with_ip_mac(
- name_getter(expected_device['id']),
- expected_device['ip_cidr'],
- expected_device['mac_address'],
- namespace, self.root_helper)
+ name_getter(expected_device['id']), expected_device['ip_cidr'],
+ expected_device['mac_address'], namespace)
def get_expected_keepalive_configuration(self, router):
ha_confs_path = self.agent.conf.ha_confs_path
device, self.agent.get_internal_device_name, router.ns_name))
def _assert_extra_routes(self, router):
- routes = ip_lib.get_routing_table(self.root_helper, router.ns_name)
+ routes = ip_lib.get_routing_table(namespace=router.ns_name)
routes = [{'nexthop': route['nexthop'],
'destination': route['destination']} for route in routes]
external_device_name = self.agent.get_external_device_name(
external_port['id'])
external_device = ip_lib.IPDevice(external_device_name,
- self.root_helper,
- router.ns_name)
+ namespace=router.ns_name)
existing_gateway = (
external_device.route.get_gateway().get('gateway'))
expected_gateway = external_port['subnet']['gateway_ip']
self.agent.get_external_device_name(external_port['id']),
'%s/32' % fip['floating_ip_address'],
external_port['mac_address'],
- router.ns_name, self.root_helper) for fip in floating_ips)
+ namespace=router.ns_name) for fip in floating_ips)
def _assert_ha_device(self, router):
self.assertTrue(self.device_exists_with_ip_mac(
router.get_ha_device_name, router.ns_name))
def _assert_no_ip_addresses_on_interface(self, router, interface):
- device = ip_lib.IPDevice(interface, self.root_helper, router.ns_name)
+ device = ip_lib.IPDevice(interface, namespace=router.ns_name)
self.assertEqual([], device.addr.list())
def test_ha_router_conf_on_restarted_agent(self):
device_name = router1.get_ha_device_name(
router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
- ha_device = ip_lib.IPDevice(device_name, self.root_helper,
- router1.ns_name)
+ ha_device = ip_lib.IPDevice(device_name, namespace=router1.ns_name)
ha_device.link.set_down()
helpers.wait_until_true(lambda: router2.ha_state == 'master')
external_device_name = self.agent.get_external_device_name(
external_port['id'])
external_device = ip_lib.IPDevice(external_device_name,
- self.root_helper,
- namespace)
+ namespace=namespace)
existing_gateway = (
external_device.route.get_gateway().get('gateway'))
expected_gateway = external_port['subnet']['gateway_ip']
fip_ns.get_ext_device_name(external_gw_port['id']),
external_gw_port['ip_cidr'],
external_gw_port['mac_address'],
- fip_ns_name, self.root_helper)
+ namespace=fip_ns_name)
self.assertTrue(fg_port_created_succesfully)
# Check fpr-router device has been created
device_name = fip_ns.get_int_device_name(router.router_id)
fpr_router_device_created_succesfully = ip_lib.device_exists(
- device_name, self.root_helper, fip_ns_name)
+ device_name, namespace=fip_ns_name)
self.assertTrue(fpr_router_device_created_succesfully)
# In the router namespace
for fip in floating_ips:
device_name = fip_ns.get_rtr_ext_device_name(router.router_id)
self.assertTrue(ip_lib.device_exists(
- device_name, self.root_helper, router.ns_name))
+ device_name, namespace=router.ns_name))
self.fail_on_missing_deps = (
base.bool_from_env('OS_FAIL_ON_MISSING_DEPS'))
- self.root_helper = os.environ.get('OS_ROOTWRAP_CMD', SUDO_CMD)
config.register_root_helper(cfg.CONF)
- cfg.CONF.set_override('root_helper', self.root_helper, group='AGENT')
+ self.config(group='AGENT',
+ root_helper=os.environ.get('OS_ROOTWRAP_CMD', SUDO_CMD))
def check_sudo_enabled(self):
if not self.sudo_enabled:
expected_calls_and_values = [
(mock.call(["ovs-vsctl", self.TO, '--', "--may-exist", "add-port",
- self.BR_NAME, pname], root_helper=self.root_helper),
+ self.BR_NAME, pname]),
None),
(mock.call(["ovs-vsctl", self.TO, "set", "Interface",
- pname, "type=gre"], root_helper=self.root_helper),
+ pname, "type=gre"]),
None),
....
]
def setUp(self):
super(TestOvsdbMonitor, self).setUp()
- self.root_helper = 'sudo'
self.monitor = ovsdb_monitor.OvsdbMonitor('Interface')
def read_output_queues_and_returns_result(self, output_type, output):
def setUp(self):
super(TestSimpleInterfaceMonitor, self).setUp()
- self.root_helper = 'sudo'
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor()
def test_is_active_is_false_by_default(self):
class AgentUtilsExecuteTest(base.BaseTestCase):
def setUp(self):
super(AgentUtilsExecuteTest, self).setUp()
- self.root_helper = "echo"
self.test_file = self.get_temp_file_path('test_execute.tmp')
open(self.test_file, 'w').close()
self.process = mock.patch('eventlet.green.subprocess.Popen').start()
def test_with_helper(self):
expected = "ls %s\n" % self.test_file
self.mock_popen.return_value = [expected, ""]
- result = utils.execute(["ls", self.test_file],
- self.root_helper)
+ self.config(group='AGENT', root_helper='echo')
+ result = utils.execute(["ls", self.test_file], run_as_root=True)
self.assertEqual(result, expected)
def test_stderr_true(self):
def test_defaults(self):
self.assertEqual('br-int', config.CONF.OVS.integration_bridge)
self.assertEqual(2, config.CONF.AGENT.polling_interval)
- self.assertEqual('sudo', config.CONF.AGENT.root_helper)
self.assertEqual('127.0.0.1', config.CONF.OFC.host)
self.assertEqual('8888', config.CONF.OFC.port)
cfg.IntOpt('ofp-tcp-listen-port', default=6633,
help='openflow tcp listen port')
])
- cfg.CONF.set_override('root_helper', 'fake_helper', group='AGENT')
super(OFATestBase, self).setup_config()
def setUp(self):
super(TestOFANeutronAgentBridge, self).setUp()
self.br_name = 'bridge1'
- self.root_helper = 'fake_helper'
self.ovs = self.mod_agent.Bridge(
self.br_name, self.ryuapp)
self.iptables_cls.return_value = self.iptables_inst
cfg.CONF.set_override('interface_driver',
'neutron.agent.linux.interface.NullDriver')
- cfg.CONF.set_override('root_helper',
- 'fake_sudo',
- 'AGENT')
self.metering = iptables_driver.IptablesMeteringDriver('metering',
cfg.CONF)
import mock
from oslo_config import cfg
-from neutron.agent.common import config
from neutron.openstack.common import uuidutils
from neutron.services.metering.agents import metering_agent
from neutron.tests import base
def setUp(self):
super(TestMeteringOperations, self).setUp()
cfg.CONF.register_opts(metering_agent.MeteringAgent.Opts)
- config.register_root_helper(cfg.CONF)
self.noop_driver = ('neutron.services.metering.drivers.noop.'
'noop_driver.NoopMeteringDriver')
def setUp(self):
super(TestMeteringDriver, self).setUp()
cfg.CONF.register_opts(metering_agent.MeteringAgent.Opts)
- config.register_root_helper(cfg.CONF)
self.noop_driver = ('neutron.services.metering.drivers.noop.'
'noop_driver.NoopMeteringDriver')
common_config.init([])
config.register_interface_driver_opts_helper(cfg.CONF)
config.register_use_namespaces_opts_helper(cfg.CONF)
- config.register_root_helper(cfg.CONF)
cfg.CONF.set_override('use_namespaces', True)
device_exists_p = mock.patch(
super(IptablesManagerStateFulTestCase, self).setUp()
cfg.CONF.register_opts(a_cfg.IPTABLES_OPTS, 'AGENT')
cfg.CONF.set_override('comment_iptables_rules', False, 'AGENT')
- self.root_helper = 'sudo'
self.iptables = iptables_manager.IptablesManager()
self.execute = mock.patch.object(self.iptables, "execute").start()
log_fail_as_error=True)
def test_run_no_namespace(self):
- base = ip_lib.SubProcessBase('sudo')
+ base = ip_lib.SubProcessBase()
base._run([], 'link', ('list',))
self.execute.assert_called_once_with(['ip', 'link', 'list'],
run_as_root=False,
log_fail_as_error=True)
def test_run_namespace(self):
- base = ip_lib.SubProcessBase('sudo', 'ns')
+ base = ip_lib.SubProcessBase(namespace='ns')
base._run([], 'link', ('list',))
self.execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns',
'ip', 'link', 'list'],
log_fail_as_error=True)
def test_as_root_namespace(self):
- base = ip_lib.SubProcessBase('sudo', 'ns')
+ base = ip_lib.SubProcessBase(namespace='ns')
base._as_root([], 'link', ('list',))
self.execute.assert_called_once_with(['ip', 'netns', 'exec', 'ns',
'ip', 'link', 'list'],
def test_get_devices(self):
self.execute.return_value = '\n'.join(LINK_SAMPLE)
- retval = ip_lib.IPWrapper('sudo').get_devices()
+ retval = ip_lib.IPWrapper().get_devices()
self.assertEqual(retval,
[ip_lib.IPDevice('lo'),
ip_lib.IPDevice('eth0'),
def test_get_devices_malformed_line(self):
self.execute.return_value = '\n'.join(LINK_SAMPLE + ['gibberish'])
- retval = ip_lib.IPWrapper('sudo').get_devices()
+ retval = ip_lib.IPWrapper().get_devices()
self.assertEqual(retval,
[ip_lib.IPDevice('lo'),
ip_lib.IPDevice('eth0'),
def test_get_namespaces(self):
self.execute.return_value = '\n'.join(NETNS_SAMPLE)
- retval = ip_lib.IPWrapper.get_namespaces('sudo')
+ retval = ip_lib.IPWrapper.get_namespaces()
self.assertEqual(retval,
['12345678-1234-5678-abcd-1234567890ab',
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
self.execute.assert_called_once_with('', 'netns', ('list',))
def test_add_tuntap(self):
- ip_lib.IPWrapper('sudo').add_tuntap('tap0')
+ ip_lib.IPWrapper().add_tuntap('tap0')
self.execute.assert_called_once_with('', 'tuntap',
('add', 'tap0', 'mode', 'tap'),
run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_add_veth(self):
- ip_lib.IPWrapper('sudo').add_veth('tap0', 'tap1')
+ ip_lib.IPWrapper().add_veth('tap0', 'tap1')
self.execute.assert_called_once_with('', 'link',
('add', 'tap0', 'type', 'veth',
'peer', 'name', 'tap1'),
log_fail_as_error=True)
def test_del_veth(self):
- ip_lib.IPWrapper('sudo').del_veth('fpr-1234')
+ ip_lib.IPWrapper().del_veth('fpr-1234')
self.execute.assert_called_once_with('', 'link',
('del', 'fpr-1234'),
run_as_root=True, namespace=None,
def test_add_veth_with_namespaces(self):
ns2 = 'ns2'
with mock.patch.object(ip_lib.IPWrapper, 'ensure_namespace') as en:
- ip_lib.IPWrapper('sudo').add_veth('tap0', 'tap1', namespace2=ns2)
+ ip_lib.IPWrapper().add_veth('tap0', 'tap1', namespace2=ns2)
en.assert_has_calls([mock.call(ns2)])
self.execute.assert_called_once_with('', 'link',
('add', 'tap0', 'type', 'veth',
log_fail_as_error=True)
def test_get_device(self):
- dev = ip_lib.IPWrapper('sudo', 'ns').device('eth0')
+ dev = ip_lib.IPWrapper(namespace='ns').device('eth0')
self.assertEqual(dev.namespace, 'ns')
self.assertEqual(dev.name, 'eth0')
def test_ensure_namespace(self):
with mock.patch.object(ip_lib, 'IPDevice') as ip_dev:
- ip = ip_lib.IPWrapper('sudo')
+ ip = ip_lib.IPWrapper()
with mock.patch.object(ip.netns, 'exists') as ns_exists:
with mock.patch('neutron.agent.linux.utils.execute'):
ns_exists.return_value = False
def test_ensure_namespace_existing(self):
with mock.patch.object(ip_lib, 'IpNetnsCommand') as ip_ns_cmd:
ip_ns_cmd.exists.return_value = True
- ns = ip_lib.IPWrapper('sudo').ensure_namespace('ns')
+ ns = ip_lib.IPWrapper().ensure_namespace('ns')
self.assertFalse(self.execute.called)
self.assertEqual(ns.namespace, 'ns')
def test_namespace_is_empty_no_devices(self):
- ip = ip_lib.IPWrapper('sudo', 'ns')
+ ip = ip_lib.IPWrapper(namespace='ns')
with mock.patch.object(ip, 'get_devices') as get_devices:
get_devices.return_value = []
get_devices.assert_called_once_with(exclude_loopback=True)
def test_namespace_is_empty(self):
- ip = ip_lib.IPWrapper('sudo', 'ns')
+ ip = ip_lib.IPWrapper(namespace='ns')
with mock.patch.object(ip, 'get_devices') as get_devices:
get_devices.return_value = [mock.Mock()]
def test_garbage_collect_namespace_does_not_exist(self):
with mock.patch.object(ip_lib, 'IpNetnsCommand') as ip_ns_cmd_cls:
ip_ns_cmd_cls.return_value.exists.return_value = False
- ip = ip_lib.IPWrapper('sudo', 'ns')
+ ip = ip_lib.IPWrapper(namespace='ns')
with mock.patch.object(ip, 'namespace_is_empty') as mock_is_empty:
self.assertFalse(ip.garbage_collect_namespace())
with mock.patch.object(ip_lib, 'IpNetnsCommand') as ip_ns_cmd_cls:
ip_ns_cmd_cls.return_value.exists.return_value = True
- ip = ip_lib.IPWrapper('sudo', 'ns')
+ ip = ip_lib.IPWrapper(namespace='ns')
with mock.patch.object(ip, 'namespace_is_empty') as mock_is_empty:
mock_is_empty.return_value = True
with mock.patch.object(ip_lib, 'IpNetnsCommand') as ip_ns_cmd_cls:
ip_ns_cmd_cls.return_value.exists.return_value = True
- ip = ip_lib.IPWrapper('sudo', 'ns')
+ ip = ip_lib.IPWrapper(namespace='ns')
with mock.patch.object(ip, 'namespace_is_empty') as mock_is_empty:
mock_is_empty.return_value = False
ip_ns_cmd_cls.mock_calls)
def test_add_vxlan_valid_port_length(self):
- retval = ip_lib.IPWrapper('sudo').add_vxlan('vxlan0', 'vni0',
- group='group0',
- dev='dev0', ttl='ttl0',
- tos='tos0',
- local='local0', proxy=True,
- port=('1', '2'))
+ retval = ip_lib.IPWrapper().add_vxlan('vxlan0', 'vni0',
+ group='group0',
+ dev='dev0', ttl='ttl0',
+ tos='tos0',
+ local='local0', proxy=True,
+ port=('1', '2'))
self.assertIsInstance(retval, ip_lib.IPDevice)
self.assertEqual(retval.name, 'vxlan0')
self.execute.assert_called_once_with('', 'link',
log_fail_as_error=True)
def test_add_vxlan_invalid_port_length(self):
- wrapper = ip_lib.IPWrapper('sudo')
+ wrapper = ip_lib.IPWrapper()
self.assertRaises(exceptions.NetworkVxlanPortRangeError,
wrapper.add_vxlan, 'vxlan0', 'vni0', group='group0',
dev='dev0', ttl='ttl0', tos='tos0',
def test_add_device_to_namespace(self):
dev = mock.Mock()
- ip_lib.IPWrapper('sudo', 'ns').add_device_to_namespace(dev)
+ ip_lib.IPWrapper(namespace='ns').add_device_to_namespace(dev)
dev.assert_has_calls([mock.call.link.set_netns('ns')])
def test_add_device_to_namespace_is_none(self):
dev = mock.Mock()
- ip_lib.IPWrapper('sudo').add_device_to_namespace(dev)
+ ip_lib.IPWrapper().add_device_to_namespace(dev)
self.assertEqual(dev.mock_calls, [])
def _test_add_rule(self, ip, table, priority):
ip_version = netaddr.IPNetwork(ip).version
- ip_lib.IpRule('sudo').add(ip, table, priority)
+ ip_lib.IpRule().add(ip, table, priority)
call_1 = mock.call([ip_version], 'rule', ['show'],
run_as_root=True, namespace=None,
log_fail_as_error=True)
def _test_add_rule_exists(self, ip, table, priority, output):
self.execute.return_value = output
ip_version = netaddr.IPNetwork(ip).version
- ip_lib.IpRule('sudo').add(ip, table, priority)
+ ip_lib.IpRule().add(ip, table, priority)
self.execute.assert_called_once_with([ip_version], 'rule',
['show'],
run_as_root=True, namespace=None,
def _test_delete_rule(self, ip, table, priority):
ip_version = netaddr.IPNetwork(ip).version
- ip_lib.IpRule('sudo').delete(ip, table, priority)
+ ip_lib.IpRule().delete(ip, table, priority)
self.execute.assert_called_once_with([ip_version], 'rule',
('del', 'table', table,
'priority', priority),
self.assertEqual(dev1, dev2)
def test_eq_diff_namespace(self):
- dev1 = ip_lib.IPDevice('tap0', 'sudo', 'ns1')
- dev2 = ip_lib.IPDevice('tap0', 'sudo', 'ns2')
+ dev1 = ip_lib.IPDevice('tap0', namespace='ns1')
+ dev2 = ip_lib.IPDevice('tap0', namespace='ns2')
self.assertNotEqual(dev1, dev2)
def test_eq_other_is_none(self):
- dev1 = ip_lib.IPDevice('tap0', 'sudo', 'ns1')
+ dev1 = ip_lib.IPDevice('tap0', namespace='ns1')
self.assertIsNotNone(dev1)
def test_str(self):
def setUp(self):
super(TestIPCommandBase, self).setUp()
self.ip = mock.Mock()
- self.ip.root_helper = 'sudo'
self.ip.namespace = 'namespace'
self.ip_cmd = ip_lib.IpCommandBase(self.ip)
self.ip_cmd.COMMAND = 'foo'
super(TestIPDeviceCommandBase, self).setUp()
self.ip_dev = mock.Mock()
self.ip_dev.name = 'eth0'
- self.ip_dev.root_helper = 'sudo'
self.ip_dev._execute = mock.Mock(return_value='executed')
self.ip_cmd = ip_lib.IpDeviceCommandBase(self.ip_dev)
self.ip_cmd.COMMAND = 'foo'
super(TestIPCmdBase, self).setUp()
self.parent = mock.Mock()
self.parent.name = 'eth0'
- self.parent.root_helper = 'sudo'
def _assert_call(self, options, args):
self.parent.assert_has_calls([
def test_execute_nosudo_with_no_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute') as execute:
self.parent.namespace = None
- self.parent.root_helper = None
self.netns_cmd.execute(['test'])
execute.assert_called_once_with(['test'],
check_exit_code=True,
function(mock.sentinel.ns_name,
mock.sentinel.iface_name,
address,
- mock.sentinel.count,
- mock.sentinel.root_helper)
+ mock.sentinel.count)
self.assertTrue(spawn_n.called)
mIPWrapper.assert_called_once_with(namespace=mock.sentinel.ns_name)
- ip_wrapper = mIPWrapper(mock.sentinel.root_helper,
- mock.sentinel.ns_name)
+ ip_wrapper = mIPWrapper(namespace=mock.sentinel.ns_name)
# Just test that arping is called with the right arguments
arping_cmd = ['arping', '-A',
@mock.patch('eventlet.spawn_n')
def test_send_garp_for_proxy_arp(self, spawn_n, mIPWrapper, mIPDevice):
addr = '20.0.0.1'
- ip_wrapper = mIPWrapper(mock.sentinel.root_helper,
- mock.sentinel.ns_name)
+ ip_wrapper = mIPWrapper(namespace=mock.sentinel.ns_name)
mIPWrapper.reset_mock()
device = mIPDevice(mock.sentinel.iface_name,
- mock.sentinel.root_helper,
namespace=mock.sentinel.ns_name)
mIPDevice.reset_mock()
# Test that the address was removed after arping
device = mIPDevice(mock.sentinel.iface_name,
- mock.sentinel.root_helper,
namespace=mock.sentinel.ns_name)
device.addr.delete.assert_called_once_with(4, addr + '/32')
self.assertEqual(util.eligible_for_deletion(conf, ns, force),
expected)
- expected_calls = [mock.call(conf.AGENT.root_helper, ns)]
+ expected_calls = [mock.call(namespace=ns)]
if not force:
expected_calls.append(mock.call().namespace_is_empty())
ip_wrap.assert_has_calls(expected_calls)
def _test_destroy_namespace_helper(self, force, num_devices):
ns = 'qrouter-6e322ac7-ab50-4f53-9cdc-d1d3c1164b6d'
conf = mock.Mock()
-# conf.AGENT.root_helper = 'sudo'
lo_device = mock.Mock()
lo_device.name = 'lo'
with mock.patch.object(util, 'kill_dhcp') as kill_dhcp:
util.destroy_namespace(conf, ns, force)
- expected = [mock.call(conf.AGENT.root_helper, ns)]
+ expected = [mock.call(namespace=ns)]
if force:
expected.extend([
def test_destroy_namespace_exception(self):
ns = 'qrouter-6e322ac7-ab50-4f53-9cdc-d1d3c1164b6d'
conf = mock.Mock()
- conf.AGENT.root_helper = 'sudo'
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip_wrap:
ip_wrap.side_effect = Exception()
util.destroy_namespace(conf, ns)
mock.call(conf, 'ns2', False)])
ip_wrap.assert_has_calls(
- [mock.call.get_namespaces(conf.AGENT.root_helper)])
+ [mock.call.get_namespaces()])
time_sleep.assert_called_once_with(2)
util.main()
ip_wrap.assert_has_calls(
- [mock.call.get_namespaces(conf.AGENT.root_helper)])
+ [mock.call.get_namespaces()])
mocks['eligible_for_deletion'].assert_has_calls(
[mock.call(conf, 'ns1', False),
self.assertEqual(conf.external_network_bridge, 'br-ex')
self.assertEqual(conf.ovs_integration_bridge, 'br-int')
self.assertFalse(conf.ovs_all_ports)
- self.assertEqual(conf.AGENT.root_helper, 'sudo')
def test_main(self):
bridges = ['br-int', 'br-ex']
ports = ['p1', 'p2', 'p3']
conf = mock.Mock()
- conf.AGENT.root_helper = 'dummy_sudo'
conf.ovs_all_ports = False
conf.ovs_integration_bridge = 'br-int'
conf.external_network_bridge = 'br-ex'