Remove the root_helper arg from the L3 agent.
Change-Id: Ib66813d9823618cdc3caf48673721173f0bdf5fc
Partially-Implements: blueprint rootwrap-daemon-mode
Depends-On: I1e3b64e5a1d6cff2aebc638710487bbdbdba61d4
from oslo_utils import importutils
from oslo_utils import timeutils
-from neutron.agent.common import config
from neutron.agent.l3 import dvr
from neutron.agent.l3 import dvr_fip_ns
from neutron.agent.l3 import dvr_router
self.conf = conf
else:
self.conf = cfg.CONF
- self.root_helper = config.get_root_helper(self.conf)
self.router_info = {}
self._check_config_params()
the database as being hosted on this node.
"""
try:
- root_ip = ip_lib.IPWrapper(self.root_helper)
+ root_ip = ip_lib.IPWrapper()
- host_namespaces = root_ip.get_namespaces(self.root_helper)
+ host_namespaces = root_ip.get_namespaces()
return set(ns for ns in host_namespaces
if (ns.startswith(NS_PREFIX)
or ns.startswith(dvr.SNAT_NS_PREFIX)))
router_id = self.get_router_id(ns)
if router_id in self.router_info:
self.router_info[router_id].radvd.disable()
- ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=ns)
+ ns_ip = ip_lib.IPWrapper(namespace=ns)
for d in ns_ip.get_devices(exclude_loopback=True):
if d.name.startswith(INTERNAL_DEV_PREFIX):
# device is on default bridge
self._delete_namespace(ns_ip, ns)
def _create_namespace(self, name):
- ip_wrapper_root = ip_lib.IPWrapper(self.root_helper)
+ ip_wrapper_root = ip_lib.IPWrapper()
ip_wrapper = ip_wrapper_root.ensure_namespace(name)
ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1'])
if self.use_ipv6:
args = []
kwargs = {
'router_id': router_id,
- 'root_helper': self.root_helper,
'router': router,
'use_ipv6': self.use_ipv6,
'ns_name': ns_name,
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)
def _get_existing_devices(self, ri):
- ip_wrapper = ip_lib.IPWrapper(root_helper=self.root_helper,
- namespace=ri.ns_name)
+ ip_wrapper = ip_lib.IPWrapper(namespace=ri.ns_name)
ip_devs = ip_wrapper.get_devices(exclude_loopback=True)
return [ip_dev.name for ip_dev in ip_devs]
def _get_external_device_interface_name(self, ri, ex_gw_port):
if ri.router['distributed']:
fip_int = ri.fip_ns.get_int_device_name(ri.router_id)
- if ip_lib.device_exists(fip_int,
- root_helper=self.root_helper,
- namespace=ri.fip_ns.get_name()):
+ if ip_lib.device_exists(fip_int, namespace=ri.fip_ns.get_name()):
return ri.fip_ns.get_rtr_ext_device_name(ri.router_id)
else:
return self.get_external_device_name(ex_gw_port['id'])
ip_lib.send_gratuitous_arp(ri.ns_name,
interface_name,
fip_ip,
- self.conf.send_arp_for_ha,
- self.root_helper)
+ self.conf.send_arp_for_ha)
return l3_constants.FLOATINGIP_STATUS_ACTIVE
def _remove_floating_ip(self, ri, device, ip_cidr):
else:
net = netaddr.IPNetwork(ip_cidr)
device.addr.delete(net.version, ip_cidr)
- self.driver.delete_conntrack_state(root_helper=self.root_helper,
- namespace=ri.ns_name,
+ self.driver.delete_conntrack_state(namespace=ri.ns_name,
ip=ip_cidr)
if ri.router['distributed']:
ri.floating_ip_removed_dist(ip_cidr)
ri.router['id'])
return fip_statuses
- device = ip_lib.IPDevice(interface_name, self.root_helper,
- namespace=ri.ns_name)
+ device = ip_lib.IPDevice(interface_name, namespace=ri.ns_name)
existing_cidrs = self._get_router_cidrs(ri, device)
new_cidrs = set()
def external_gateway_added(self, ri, ex_gw_port, interface_name):
if ri.router['distributed']:
- ip_wrapr = ip_lib.IPWrapper(self.root_helper, namespace=ri.ns_name)
+ ip_wrapr = ip_lib.IPWrapper(namespace=ri.ns_name)
ip_wrapr.netns.execute(['sysctl', '-w',
'net.ipv4.conf.all.send_redirects=0'])
snat_ports = self.get_snat_interfaces(ri)
def _external_gateway_added(self, ri, ex_gw_port, interface_name,
ns_name, preserve_ips):
- if not ip_lib.device_exists(interface_name,
- root_helper=self.root_helper,
- namespace=ns_name):
+ if not ip_lib.device_exists(interface_name, namespace=ns_name):
self.driver.plug(ex_gw_port['network_id'],
ex_gw_port['id'], interface_name,
ex_gw_port['mac_address'],
ip_lib.send_gratuitous_arp(ns_name,
interface_name,
ip_address,
- self.conf.send_arp_for_ha,
- self.root_helper)
+ self.conf.send_arp_for_ha)
def external_gateway_removed(self, ri, ex_gw_port, interface_name):
if ri.router['distributed']:
def _internal_network_added(self, ns_name, network_id, port_id,
internal_cidr, mac_address,
interface_name, prefix, is_ha=False):
- if not ip_lib.device_exists(interface_name,
- root_helper=self.root_helper,
- namespace=ns_name):
+ if not ip_lib.device_exists(interface_name, namespace=ns_name):
self.driver.plug(network_id, port_id, interface_name, mac_address,
namespace=ns_name,
prefix=prefix)
ip_lib.send_gratuitous_arp(ns_name,
interface_name,
ip_address,
- self.conf.send_arp_for_ha,
- self.root_helper)
+ self.conf.send_arp_for_ha)
def internal_network_added(self, ri, port):
network_id = port['network_id']
ns_name = self.get_snat_ns_name(ri.router['id'])
prefix = dvr.SNAT_INT_DEV_PREFIX
if ip_lib.device_exists(snat_interface,
- root_helper=self.root_helper,
namespace=ns_name):
self.driver.unplug(snat_interface, namespace=ns_name,
prefix=prefix)
- if ip_lib.device_exists(interface_name,
- root_helper=self.root_helper,
- namespace=ri.ns_name):
+ if ip_lib.device_exists(interface_name, namespace=ri.ns_name):
if ri.is_ha:
ri._clear_vips(interface_name)
self.driver.unplug(interface_name, namespace=ri.ns_name,
fip_ns = dvr_fip_ns.FipNamespace(ext_net_id,
self.conf,
self.driver,
- self.root_helper,
self.use_ipv6)
self._fip_namespaces[ext_net_id] = fip_ns
return fip_ns
def _destroy_snat_namespace(self, ns):
- ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=ns)
+ ns_ip = ip_lib.IPWrapper(namespace=ns)
# delete internal interfaces
for d in ns_ip.get_devices(exclude_loopback=True):
if d.name.startswith(SNAT_INT_DEV_PREFIX):
try:
ip_cidr = sn_port['ip_cidr']
snat_idx = self._get_snat_idx(ip_cidr)
- ns_ipr = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
- ns_ipd = ip_lib.IPDevice(sn_int, self.root_helper,
- namespace=ri.ns_name)
+ ns_ipr = ip_lib.IpRule(namespace=ri.ns_name)
+ ns_ipd = ip_lib.IPDevice(sn_int, namespace=ri.ns_name)
ns_ipd.route.add_gateway(gateway, table=snat_idx)
ns_ipr.add(ip_cidr, snat_idx, snat_idx)
ns_ipr.netns.execute(['sysctl', '-w', 'net.ipv4.conf.%s.'
try:
ip_cidr = sn_port['ip_cidr']
snat_idx = self._get_snat_idx(ip_cidr)
- ns_ipr = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
- ns_ipd = ip_lib.IPDevice(sn_int, self.root_helper,
- namespace=ri.ns_name)
+ ns_ipr = ip_lib.IpRule(namespace=ri.ns_name)
+ ns_ipd = ip_lib.IPDevice(sn_int, namespace=ri.ns_name)
ns_ipd.route.delete_gateway(table=snat_idx)
ns_ipr.delete(ip_cidr, snat_idx, snat_idx)
except Exception:
# TODO(mrsmith): optimize the calls below for bulk calls
net = netaddr.IPNetwork(ip_cidr)
interface_name = self.get_internal_device_name(port['id'])
- device = ip_lib.IPDevice(interface_name, self.root_helper,
- namespace=ri.ns_name)
+ device = ip_lib.IPDevice(interface_name, namespace=ri.ns_name)
if operation == 'add':
device.neigh.add(net.version, ip, mac)
elif operation == 'delete':
class FipNamespace(object):
- def __init__(self, ext_net_id, agent_conf, driver, root_helper, use_ipv6):
+ def __init__(self, ext_net_id, agent_conf, driver, use_ipv6):
self._ext_net_id = ext_net_id
self.agent_conf = agent_conf
self.driver = driver
- self.root_helper = root_helper
self.use_ipv6 = use_ipv6
self.agent_gateway_port = None
self._subscribers = set()
def _gateway_added(self, ex_gw_port, interface_name):
"""Add Floating IP gateway port."""
ns_name = self.get_name()
- if not ip_lib.device_exists(interface_name,
- root_helper=self.root_helper,
- namespace=ns_name):
+ if not ip_lib.device_exists(interface_name, namespace=ns_name):
self.driver.plug(ex_gw_port['network_id'],
ex_gw_port['id'],
interface_name,
ip_lib.send_gratuitous_arp(ns_name,
interface_name,
ip_address,
- self.agent_conf.send_arp_for_ha,
- self.root_helper)
+ self.agent_conf.send_arp_for_ha)
gw_ip = ex_gw_port['subnet']['gateway_ip']
if gw_ip:
- ipd = ip_lib.IPDevice(interface_name,
- self.root_helper,
- namespace=ns_name)
+ ipd = ip_lib.IPDevice(interface_name, namespace=ns_name)
ipd.route.add_gateway(gw_ip)
cmd = ['sysctl', '-w', 'net.ipv4.conf.%s.proxy_arp=1' % interface_name]
# TODO(Carl) mlavelle's work has self.ip_wrapper
- ip_wrapper = ip_lib.IPWrapper(self.root_helper, namespace=ns_name)
+ ip_wrapper = ip_lib.IPWrapper(namespace=ns_name)
ip_wrapper.netns.execute(cmd, check_exit_code=False)
def create(self):
# TODO(Carl) Get this functionality from mlavelle's namespace baseclass
- ip_wrapper_root = ip_lib.IPWrapper(self.root_helper)
+ ip_wrapper_root = ip_lib.IPWrapper()
ip_wrapper = ip_wrapper_root.ensure_namespace(self.get_name())
ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1'])
if self.use_ipv6:
ns = self.get_name()
# TODO(carl) Reconcile this with mlavelle's namespace work
# TODO(carl) mlavelle's work has self.ip_wrapper
- ip_wrapper = ip_lib.IPWrapper(self.root_helper, namespace=ns)
+ ip_wrapper = ip_lib.IPWrapper(namespace=ns)
for d in ip_wrapper.get_devices(exclude_loopback=True):
if d.name.startswith(FIP_2_ROUTER_DEV_PREFIX):
# internal link between IRs and FIP NS
def _internal_ns_interface_added(self, ip_cidr,
interface_name, ns_name):
- ip_wrapper = ip_lib.IPWrapper(self.root_helper, namespace=ns_name)
+ ip_wrapper = ip_lib.IPWrapper(namespace=ns_name)
ip_wrapper.netns.execute(['ip', 'addr', 'add',
ip_cidr, 'dev', interface_name])
if ri.rtr_fip_subnet is None:
ri.rtr_fip_subnet = self.local_subnets.allocate(ri.router_id)
rtr_2_fip, fip_2_rtr = ri.rtr_fip_subnet.get_pair()
- ip_wrapper = ip_lib.IPWrapper(self.root_helper,
- namespace=ri.ns_name)
+ ip_wrapper = ip_lib.IPWrapper(namespace=ri.ns_name)
device_exists = ip_lib.device_exists(rtr_2_fip_name,
- self.root_helper,
namespace=ri.ns_name)
if not device_exists:
int_dev = ip_wrapper.add_veth(rtr_2_fip_name,
int_dev[1].link.set_up()
# add default route for the link local interface
- device = ip_lib.IPDevice(rtr_2_fip_name,
- self.root_helper,
- namespace=ri.ns_name)
+ device = ip_lib.IPDevice(rtr_2_fip_name, namespace=ri.ns_name)
device.route.add_gateway(str(fip_2_rtr.ip), table=FIP_RT_TBL)
#setup the NAT rules and chains
ri._handle_fip_nat_rules(rtr_2_fip_name, 'add_rules')
# scan system for any existing fip ports
ri.dist_fip_count = 0
rtr_2_fip_interface = self.get_rtr_ext_device_name(ri.router_id)
- if ip_lib.device_exists(rtr_2_fip_interface,
- root_helper=self.root_helper,
- namespace=ri.ns_name):
- device = ip_lib.IPDevice(rtr_2_fip_interface,
- self.root_helper,
- namespace=ri.ns_name)
+ if ip_lib.device_exists(rtr_2_fip_interface, namespace=ri.ns_name):
+ device = ip_lib.IPDevice(rtr_2_fip_interface, namespace=ri.ns_name)
existing_cidrs = [addr['cidr'] for addr in device.addr.list()]
fip_cidrs = [c for c in existing_cidrs if
common_utils.is_cidr_host(c)]
rule_pr = self.fip_ns.allocate_rule_priority()
self.floating_ips_dict[floating_ip] = rule_pr
fip_2_rtr_name = self.fip_ns.get_int_device_name(self.router_id)
- ip_rule = ip_lib.IpRule(self.root_helper, namespace=self.ns_name)
+ ip_rule = ip_lib.IpRule(namespace=self.ns_name)
ip_rule.add(fixed_ip, dvr_fip_ns.FIP_RT_TBL, rule_pr)
#Add routing rule in fip namespace
fip_ns_name = self.fip_ns.get_name()
rtr_2_fip, _ = self.rtr_fip_subnet.get_pair()
- device = ip_lib.IPDevice(fip_2_rtr_name, self.root_helper,
- namespace=fip_ns_name)
+ device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name)
device.route.add_route(fip_cidr, str(rtr_2_fip.ip))
interface_name = (
self.fip_ns.get_ext_device_name(
ip_lib.send_garp_for_proxyarp(fip_ns_name,
interface_name,
floating_ip,
- self.agent_conf.send_arp_for_ha,
- self.root_helper)
+ self.agent_conf.send_arp_for_ha)
# update internal structures
self.dist_fip_count = self.dist_fip_count + 1
fip_ns_name = self.fip_ns.get_name()
if floating_ip in self.floating_ips_dict:
rule_pr = self.floating_ips_dict[floating_ip]
- ip_rule = ip_lib.IpRule(self.root_helper, namespace=self.ns_name)
+ ip_rule = ip_lib.IpRule(namespace=self.ns_name)
ip_rule.delete(floating_ip, dvr_fip_ns.FIP_RT_TBL, rule_pr)
self.fip_ns.deallocate_rule_priority(rule_pr)
#TODO(rajeev): Handle else case - exception/log?
- device = ip_lib.IPDevice(fip_2_rtr_name, self.root_helper,
- namespace=fip_ns_name)
+ device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name)
device.route.delete_route(fip_cidr, str(rtr_2_fip.ip))
# check if this is the last FIP for this router
self.dist_fip_count = self.dist_fip_count - 1
if self.dist_fip_count == 0:
#remove default route entry
- device = ip_lib.IPDevice(rtr_2_fip_name,
- self.root_helper,
- namespace=self.ns_name)
- ns_ip = ip_lib.IPWrapper(self.root_helper,
- namespace=fip_ns_name)
+ device = ip_lib.IPDevice(rtr_2_fip_name, namespace=self.ns_name)
+ ns_ip = ip_lib.IPWrapper(namespace=fip_ns_name)
device.route.delete_gateway(str(fip_2_rtr.ip),
table=dvr_fip_ns.FIP_RT_TBL)
self.fip_ns.local_subnets.release(self.router_id)
self.router['id'],
keepalived.KeepalivedConf(),
conf_path=self.agent_conf.ha_confs_path,
- namespace=self.ns_name,
- root_helper=self.root_helper)
+ namespace=self.ns_name)
def _init_keepalived_manager(self):
# TODO(Carl) This looks a bit funny, doesn't it?
process = keepalived.KeepalivedManager.get_process(
self.agent_conf,
self.router_id,
- self.root_helper,
self.ns_name,
self.agent_conf.ha_confs_path)
if process.active:
a VIP to keepalived. This means that the IPv6 link local address
will only be present on the master.
"""
- device = ip_lib.IPDevice(interface_name,
- self.root_helper,
- self.ns_name)
+ device = ip_lib.IPDevice(interface_name, namespace=self.ns_name)
ipv6_lladdr = self._get_ipv6_lladdr(device.link.address)
if self._should_delete_ipv6_lladdr(ipv6_lladdr):
def __init__(self,
router_id,
router,
- root_helper,
agent_conf,
interface_driver,
use_ipv6=False,
+ root_helper=None,
ns_name=None):
self.router_id = router_id
self.ex_gw_port = None
self._snat_action = None
self.internal_ports = []
self.floating_ips = set()
- self.root_helper = root_helper
# Invoke the setter for establishing initial SNAT action
self.router = router
self.ns_name = ns_name
def _update_routing_table(self, operation, route):
cmd = ['ip', 'route', operation, 'to', route['destination'],
'via', route['nexthop']]
- ip_wrapper = ip_lib.IPWrapper(self.root_helper,
- namespace=self.ns_name)
+ ip_wrapper = ip_lib.IPWrapper(namespace=self.ns_name)
ip_wrapper.netns.execute(cmd, check_exit_code=False)
def routes_updated(self):
for ip_cidr, ip_version in previous.items():
if ip_cidr not in preserve_ips:
device.addr.delete(ip_version, ip_cidr)
- self.delete_conntrack_state(root_helper=self.root_helper,
- namespace=namespace,
- ip=ip_cidr)
+ self.delete_conntrack_state(namespace=namespace, ip=ip_cidr)
if gateway:
device.route.add_gateway(gateway)
for route in existing_onlink_routes - new_onlink_routes:
device.route.delete_onlink_route(route)
- def delete_conntrack_state(self, root_helper, namespace, ip):
+ def delete_conntrack_state(self, namespace, ip):
"""Delete conntrack state associated with an IP address.
This terminates any active connections through an IP. Call this soon
after removing the IP address from an interface so that new connections
cannot be created before the IP address is gone.
- root_helper: root_helper to gain root access to call conntrack
namespace: the name of the namespace where the IP has been configured
ip: the IP address for which state should be removed. This can be
passed as a string with or without /NN. A netaddr.IPAddress or
netaddr.Network representing the IP address can also be passed.
"""
ip_str = str(netaddr.IPNetwork(ip).ip)
- ip_wrapper = ip_lib.IPWrapper(root_helper, namespace=namespace)
+ ip_wrapper = ip_lib.IPWrapper(namespace=namespace)
# Delete conntrack state for ingress traffic
# If 0 flow entries have been deleted
"""
def __init__(self, resource_id, config, conf_path='/tmp',
- namespace=None, root_helper=None):
+ namespace=None):
self.resource_id = resource_id
self.config = config
self.namespace = namespace
- self.root_helper = root_helper
self.conf_path = conf_path
self.conf = cfg.CONF
self.process = None
self.process = self.get_process(self.conf,
self.resource_id,
- self.root_helper,
self.namespace,
self.conf_path)
self.restart()
@classmethod
- def get_process(cls, conf, resource_id, root_helper, namespace, conf_path):
+ def get_process(cls, conf, resource_id, namespace, conf_path):
return external_process.ProcessManager(
conf,
resource_id,
# from device drivers, which are now provided a service instance.
# TODO(pcm): Address this in future refactorings.
self.conf = l3_agent.conf
- self.root_helper = l3_agent.root_helper
@classmethod
def instance(cls, l3_agent):
from oslo_config import cfg
from oslo_utils import importutils
-from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.common import exceptions as nexception
from neutron.common import topics
msg = _('Error importing FWaaS device driver: %s')
raise ImportError(msg % fwaas_driver_class_path)
self.services_sync = False
- self.root_helper = config.get_root_helper(conf)
# setup RPC to msg fwaas plugin
self.fwplugin_rpc = FWaaSL3PluginApi(topics.FIREWALL_PLUGIN,
conf.host)
def _get_router_info_list_for_tenant(self, routers, tenant_id):
"""Returns the list of router info objects on which to apply the fw."""
- root_ip = ip_lib.IPWrapper(self.root_helper)
+ root_ip = ip_lib.IPWrapper()
# Get the routers for the tenant
router_ids = [
router['id']
for router in routers
if router['tenant_id'] == tenant_id]
- local_ns_list = root_ip.get_namespaces(
- self.root_helper) if self.conf.use_namespaces else []
+ local_ns_list = (root_ip.get_namespaces()
+ if self.conf.use_namespaces else [])
router_info_list = []
# Pick up namespaces for Tenant Routers
def test_keepalived_spawn(self):
expected_config = self._get_config()
manager = keepalived.KeepalivedManager('router1', expected_config,
- conf_path=cfg.CONF.state_path,
- root_helper=self.root_helper)
+ conf_path=cfg.CONF.state_path)
self.addCleanup(manager.disable)
manager.spawn()
super(TestRouterInfo, self).setUp()
conf = agent_config.setup_conf()
- agent_config.register_root_helper(conf)
- conf.root_helper = 'sudo'
self.ip_cls_p = mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
ip_cls = self.ip_cls_p.start()
self.mock_ip = mock.MagicMock()
ip_cls.return_value = self.mock_ip
- self.ri_kwargs = {'root_helper': conf.root_helper,
- 'agent_conf': conf,
+ self.ri_kwargs = {'agent_conf': conf,
'interface_driver': mock.sentinel.interface_driver}
def _check_agent_method_called(self, calls):
self.fip_ns = dvr_fip_ns.FipNamespace(self.net_id,
self.conf,
self.driver,
- mock.sentinel.root_helper,
use_ipv6=True)
def test_subscribe(self):
send_arp.assert_called_once_with(self.fip_ns.get_name(),
mock.sentinel.interface_name,
'20.0.0.30',
- mock.ANY, mock.ANY)
+ mock.ANY)
@mock.patch.object(ip_lib, 'IPWrapper')
def test_destroy(self, IPWrapper):
agent_conf = mock.Mock()
return dvr_router.DvrRouter(mock.sentinel.router_id,
router,
- mock.sentinel.root_helper,
agent_conf,
mock.sentinel.interface_driver,
**kwargs)
self.conf.register_opts(ha.OPTS)
agent_config.register_interface_driver_opts_helper(self.conf)
agent_config.register_use_namespaces_opts_helper(self.conf)
- agent_config.register_root_helper(self.conf)
agent_config.register_process_monitor_opts(self.conf)
self.conf.register_opts(interface.OPTS)
self.conf.register_opts(external_process.OPTS)
'neutron.agent.linux.interface.NullDriver')
self.conf.set_override('send_arp_for_ha', 1)
self.conf.set_override('state_path', '')
- self.conf.AGENT.root_helper = 'sudo'
self.device_exists_p = mock.patch(
'neutron.agent.linux.ip_lib.device_exists')
'ip_address': '152.10.0.13'}],
'id': _uuid(), 'device_id': _uuid()}]
- self.ri_kwargs = {'root_helper': self.conf.AGENT.root_helper,
- 'agent_conf': self.conf,
+ self.ri_kwargs = {'agent_conf': self.conf,
'interface_driver': mock.sentinel.interface_driver}
def _prepare_internal_network_data(self):
self.assertEqual(self.mock_driver.plug.call_count, 1)
self.assertEqual(self.mock_driver.init_l3.call_count, 1)
self.send_arp.assert_called_once_with(ri.ns_name, interface_name,
- '99.0.1.9',
- mock.ANY, mock.ANY)
+ '99.0.1.9', mock.ANY)
elif action == 'remove':
self.device_exists.return_value = True
agent.internal_network_removed(ri, port)
self.assertEqual(self.mock_driver.init_l3.call_count, 1)
self.send_arp.assert_called_once_with(ri.ns_name,
interface_name,
- '20.0.0.30',
- mock.ANY, mock.ANY)
+ '20.0.0.30', mock.ANY)
kwargs = {'preserve_ips': ['192.168.1.34/32'],
'namespace': 'qrouter-' + router['id'],
'gateway': '20.0.0.1',
self.assertEqual(self.mock_driver.plug.call_count, 0)
self.assertEqual(self.mock_driver.init_l3.call_count, 1)
self.send_arp.assert_called_once_with(ri.ns_name, interface_name,
- '20.0.0.30', mock.ANY, mock.ANY)
+ '20.0.0.30', mock.ANY)
kwargs = {'preserve_ips': ['192.168.1.34/32'],
'namespace': 'qrouter-' + router['id'],
'gateway': '20.0.0.1',
self.assertEqual({}, fip_statuses)
device.addr.delete.assert_called_once_with(4, '15.1.2.3/32')
self.mock_driver.delete_conntrack_state.assert_called_once_with(
- root_helper=self.conf.AGENT.root_helper,
namespace=ri.ns_name,
ip='15.1.2.3/32')