from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import ovs_lib # noqa
-from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as l3_constants
from neutron.common import legacy
'-c', self.conf.send_arp_for_ha,
ip_address]
try:
- if self.conf.use_namespaces:
- ip_wrapper = ip_lib.IPWrapper(self.root_helper,
- namespace=ri.ns_name())
- ip_wrapper.netns.execute(arping_cmd, check_exit_code=True)
- else:
- utils.execute(arping_cmd, check_exit_code=True,
- root_helper=self.root_helper)
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper,
+ namespace=ri.ns_name())
+ ip_wrapper.netns.execute(arping_cmd, check_exit_code=True)
except Exception as e:
LOG.error(_("Failed sending gratuitous ARP: %s"), str(e))
gw_ip = ex_gw_port['subnet']['gateway_ip']
if ex_gw_port['subnet']['gateway_ip']:
cmd = ['route', 'add', 'default', 'gw', gw_ip]
- if self.conf.use_namespaces:
- ip_wrapper = ip_lib.IPWrapper(self.root_helper,
- namespace=ri.ns_name())
- ip_wrapper.netns.execute(cmd, check_exit_code=False)
- else:
- utils.execute(cmd, check_exit_code=False,
- root_helper=self.root_helper)
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper,
+ namespace=ri.ns_name())
+ ip_wrapper.netns.execute(cmd, check_exit_code=False)
def external_gateway_removed(self, ri, ex_gw_port,
interface_name, internal_cidrs):
def _update_routing_table(self, ri, operation, route):
cmd = ['ip', 'route', operation, 'to', route['destination'],
'via', route['nexthop']]
- #TODO(nati) move this code to iplib
- if self.conf.use_namespaces:
- ip_wrapper = ip_lib.IPWrapper(self.conf.root_helper,
- namespace=ri.ns_name())
- ip_wrapper.netns.execute(cmd, check_exit_code=False)
- else:
- utils.execute(cmd, check_exit_code=False,
- root_helper=self.conf.root_helper)
+ ip_wrapper = ip_lib.IPWrapper(self.conf.root_helper,
+ namespace=ri.ns_name())
+ ip_wrapper.netns.execute(cmd, check_exit_code=False)
def routes_updated(self, ri):
new_routes = ri.router['routes']
if self.conf.dhcp_domain:
cmd.append('--domain=%s' % self.conf.dhcp_domain)
- if self.network.namespace:
- ip_wrapper = ip_lib.IPWrapper(self.root_helper,
- self.network.namespace)
- ip_wrapper.netns.execute(cmd, addl_env=env)
- else:
- # For normal sudo prepend the env vars before command
- cmd = ['%s=%s' % pair for pair in env.items()] + cmd
- utils.execute(cmd, self.root_helper)
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper,
+ self.network.namespace)
+ ip_wrapper.netns.execute(cmd, addl_env=env)
def _release_lease(self, mac_address, ip):
"""Release a DHCP lease."""
cmd = ['dhcp_release', self.interface_name, ip, mac_address]
- if self.network.namespace:
- ip_wrapper = ip_lib.IPWrapper(self.root_helper,
- self.network.namespace)
- ip_wrapper.netns.execute(cmd)
- else:
- utils.execute(cmd, self.root_helper)
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper,
+ self.network.namespace)
+ ip_wrapper.netns.execute(cmd)
def reload_allocations(self):
"""Rebuild the dnsmasq config and signal the dnsmasq to reload."""
if not self.active:
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
- if self.namespace:
- ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
- ip_wrapper.netns.execute(cmd)
- else:
- # For normal sudo prepend the env vars before command
- utils.execute(cmd, self.root_helper)
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
+ ip_wrapper.netns.execute(cmd)
def disable(self):
pid = self.pid
def execute(self, cmds, addl_env={}, check_exit_code=True):
if not self._parent.root_helper:
raise exceptions.SudoRequired()
- elif not self._parent.namespace:
- raise Exception(_('No namespace defined for parent'))
- else:
- env_params = []
- if addl_env:
- env_params = (['env'] +
- ['%s=%s' % pair for pair in addl_env.items()])
- return utils.execute(
- ['ip', 'netns', 'exec', self._parent.namespace] +
- env_params + list(cmds),
- root_helper=self._parent.root_helper,
- check_exit_code=check_exit_code)
+ ns_params = []
+ if self._parent.namespace:
+ ns_params = ['ip', 'netns', 'exec', self._parent.namespace]
+
+ env_params = []
+ if addl_env:
+ env_params = (['env'] +
+ ['%s=%s' % pair for pair in addl_env.items()])
+ return utils.execute(
+ ns_params + env_params + list(cmds),
+ root_helper=self._parent.root_helper,
+ check_exit_code=check_exit_code)
def exists(self, name):
output = self._as_root('list', options='o', use_root_namespace=True)
'-I', interface_name,
'-c', self.conf.send_arp_for_ha,
floating_ip]
- if self.conf.use_namespaces:
- self.mock_ip.netns.execute.assert_any_call(
- arping_cmd, check_exit_code=True)
- else:
- self.utils_exec.assert_any_call(arping_cmd,
- check_exit_code=True,
- root_helper=self.conf.root_helper)
+ self.mock_ip.netns.execute.assert_any_call(
+ arping_cmd, check_exit_code=True)
def test_arping_namespace(self):
self._test_arping(namespace=True)
self._test_external_gateway_action('remove')
def _check_agent_method_called(self, agent, calls, namespace):
- if namespace:
- self.mock_ip.netns.execute.assert_has_calls(
- [mock.call(call, check_exit_code=False) for call in calls],
- any_order=True)
- else:
- self.utils_exec.assert_has_calls([
- mock.call(call, root_helper='sudo',
- check_exit_code=False) for call in calls],
- any_order=True)
+ self.mock_ip.netns.execute.assert_has_calls(
+ [mock.call(call, check_exit_code=False) for call in calls],
+ any_order=True)
def _test_routing_table_update(self, namespace):
if not namespace:
manager.enable(callback)
callback.assert_called_once_with('pidfile')
name.assert_called_once_with(ensure_pids_dir=True)
- self.execute.assert_called_once_with(['the', 'cmd'], 'sudo')
+ self.execute.assert_called_once_with(['the', 'cmd'],
+ root_helper='sudo',
+ check_exit_code=True)
def test_enable_with_namespace(self):
callback = mock.Mock()