ri.floating_ips_dict[floating_ip] = rule_pr
fip_2_rtr_name = self.get_fip_int_device_name(ri.router_id)
ip_rule = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
- ip_rule.add_rule_from(fixed_ip, FIP_RT_TBL, rule_pr)
+ ip_rule.add(fixed_ip, FIP_RT_TBL, rule_pr)
#Add routing rule in fip namespace
fip_ns_name = self.get_fip_ns_name(str(fip['floating_network_id']))
rtr_2_fip, _ = ri.rtr_fip_subnet.get_pair()
ri.rtr_fip_subnet = self.local_subnets.allocate(ri.router_id)
rtr_2_fip, fip_2_rtr = ri.rtr_fip_subnet.get_pair()
fip_ns_name = self.get_fip_ns_name(str(self._fetch_external_net_id()))
- ip_rule_rtr = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
if floating_ip in ri.floating_ips_dict:
rule_pr = ri.floating_ips_dict[floating_ip]
- ip_rule_rtr.delete_rule_priority(rule_pr)
+ ip_rule = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
+ ip_rule.delete(floating_ip, FIP_RT_TBL, rule_pr)
self.fip_priorities.add(rule_pr)
#TODO(rajeev): Handle else case - exception/log?
def _snat_redirect_add(self, ri, gateway, sn_port, sn_int):
"""Adds rules and routes for SNAT redirection."""
try:
- snat_idx = self._get_snat_idx(sn_port['ip_cidr'])
+ ip_cidr = sn_port['ip_cidr']
+ snat_idx = self._get_snat_idx(ip_cidr)
ns_ipr = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
ns_ipd = ip_lib.IPDevice(sn_int, self.root_helper,
namespace=ri.ns_name)
ns_ipd.route.add_gateway(gateway, table=snat_idx)
- ns_ipr.add_rule_from(sn_port['ip_cidr'], snat_idx, snat_idx)
+ ns_ipr.add(ip_cidr, snat_idx, snat_idx)
ns_ipr.netns.execute(['sysctl', '-w', 'net.ipv4.conf.%s.'
'send_redirects=0' % sn_int])
except Exception:
def _snat_redirect_remove(self, ri, sn_port, sn_int):
"""Removes rules and routes for SNAT redirection."""
try:
- snat_idx = self._get_snat_idx(sn_port['ip_cidr'])
+ ip_cidr = sn_port['ip_cidr']
+ snat_idx = self._get_snat_idx(ip_cidr)
ns_ipr = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
ns_ipd = ip_lib.IPDevice(sn_int, self.root_helper,
namespace=ri.ns_name)
ns_ipd.route.delete_gateway(table=snat_idx)
- ns_ipr.delete_rule_priority(snat_idx)
+ ns_ipr.delete(ip_cidr, snat_idx, snat_idx)
except Exception:
LOG.exception(_LE('DVR: removed snat failed'))
class IpRule(IPWrapper):
- def add_rule_from(self, ip, table, rule_pr):
- args = ['add', 'from', ip, 'lookup', table, 'priority', rule_pr]
- ip = self._as_root('', 'rule', tuple(args))
+ def add(self, ip, table, rule_pr):
+ ip_version = netaddr.IPNetwork(ip).version
+ args = ['add', 'from', ip, 'table', table, 'priority', rule_pr]
+ ip = self._as_root([ip_version], 'rule', tuple(args))
return ip
- def delete_rule_priority(self, rule_pr):
- args = ['del', 'priority', rule_pr]
- ip = self._as_root('', 'rule', tuple(args))
+ def delete(self, ip, table, rule_pr):
+ ip_version = netaddr.IPNetwork(ip).version
+ args = ['del', 'table', table, 'priority', rule_pr]
+ ip = self._as_root([ip_version], 'rule', tuple(args))
return ip
ri.dist_fip_count = 0
ip_cidr = common_utils.ip_to_cidr(fip['floating_ip_address'])
agent.floating_ip_added_dist(ri, fip, ip_cidr)
- self.mock_rule.add_rule_from.assert_called_with('192.168.0.1',
- 16, FIP_PRI)
+ self.mock_rule.add.assert_called_with('192.168.0.1', 16, FIP_PRI)
# TODO(mrsmith): add more asserts
@mock.patch.object(l3_agent.L3NATAgent, '_fip_ns_unsubscribe')
s = lla.LinkLocalAddressPair('169.254.30.42/31')
ri.rtr_fip_subnet = s
agent.floating_ip_removed_dist(ri, fip_cidr)
- self.mock_rule.delete_rule_priority.assert_called_with(FIP_PRI)
+ floating_ip = fip_cidr.split('/')[0]
+ self.mock_rule.delete.assert_called_with(floating_ip, 16, FIP_PRI)
self.mock_ip_dev.route.delete_route.assert_called_with(fip_cidr,
str(s.ip))
self.assertFalse(unsubscribe.called, '_fip_ns_unsubscribe called!')
import os
import mock
+import netaddr
from neutron.agent.linux import ip_lib
from neutron.common import exceptions
self.execute_p = mock.patch.object(ip_lib.IpRule, '_execute')
self.execute = self.execute_p.start()
- def test_add_rule_from(self):
- ip_lib.IpRule('sudo').add_rule_from('192.168.45.100', 2, 100)
- self.execute.assert_called_once_with('', 'rule',
- ('add', 'from', '192.168.45.100',
- 'lookup', 2, 'priority', 100),
+ def _test_add_rule(self, ip, table, priority):
+ ip_version = netaddr.IPNetwork(ip).version
+ ip_lib.IpRule('sudo').add(ip, table, priority)
+ self.execute.assert_called_once_with([ip_version], 'rule',
+ ('add', 'from', ip,
+ 'table', table,
+ 'priority', priority),
'sudo', None,
log_fail_as_error=True)
- def test_delete_rule_priority(self):
- ip_lib.IpRule('sudo').delete_rule_priority(100)
- self.execute.assert_called_once_with('', 'rule',
- ('del', 'priority', 100),
+ def _test_delete_rule(self, ip, table, priority):
+ ip_version = netaddr.IPNetwork(ip).version
+ ip_lib.IpRule('sudo').delete(ip, table, priority)
+ self.execute.assert_called_once_with([ip_version], 'rule',
+ ('del', 'table', table,
+ 'priority', priority),
'sudo', None,
log_fail_as_error=True)
+ def test_add_rule_v4(self):
+ self._test_add_rule('192.168.45.100', 2, 100)
+
+ def test_add_rule_v6(self):
+ self._test_add_rule('2001:db8::1', 3, 200)
+
+ def test_delete_rule_v4(self):
+ self._test_delete_rule('192.168.45.100', 2, 100)
+
+ def test_delete_rule_v6(self):
+ self._test_delete_rule('2001:db8::1', 3, 200)
+
class TestIPDevice(base.BaseTestCase):
def test_eq_same_name(self):