class IpRule(IPWrapper):
+ def _exists(self, ip, ip_version, table, rule_pr):
+ # Typical rule from 'ip rule show':
+ # 4030201: from 1.2.3.4/24 lookup 10203040
+
+ rule_pr = str(rule_pr) + ":"
+ for line in self._as_root([ip_version], 'rule', ['show']).splitlines():
+ parts = line.split()
+ if parts and (parts[0] == rule_pr and
+ parts[2] == str(ip) and
+ parts[-1] == str(table)):
+ return True
+
+ return False
+
def add(self, ip, table, rule_pr):
ip_version = netaddr.IPNetwork(ip).version
- args = ['add', 'from', ip, 'table', table, 'priority', rule_pr]
- ip = self._as_root([ip_version], 'rule', tuple(args))
- return ip
+ if not self._exists(ip, ip_version, table, rule_pr):
+ args = ['add', 'from', ip, 'table', table, 'priority', rule_pr]
+ self._as_root([ip_version], 'rule', tuple(args))
def delete(self, ip, table, rule_pr):
ip_version = netaddr.IPNetwork(ip).version
args = ['del', 'table', table, 'priority', rule_pr]
- ip = self._as_root([ip_version], 'rule', tuple(args))
- return ip
+ self._as_root([ip_version], 'rule', tuple(args))
class IPDevice(SubProcessBase):
SUBNET_SAMPLE2 = ("10.0.0.0/24 dev tap1d7888a7-10 scope link src 10.0.0.2\n"
"10.0.0.0/24 dev qr-23380d11-d2 scope link src 10.0.0.1")
+RULE_V4_SAMPLE = ("""
+0: from all lookup local
+32766: from all lookup main
+32767: from all lookup default
+101: from 192.168.45.100 lookup 2
+""")
+
+RULE_V6_SAMPLE = ("""
+0: from all lookup local
+32766: from all lookup main
+32767: from all lookup default
+201: from 2001:db8::1 lookup 3
+""")
+
class TestSubProcessBase(base.BaseTestCase):
def setUp(self):
self.execute = self.execute_p.start()
def _test_add_rule(self, ip, table, priority):
+ ip_version = netaddr.IPNetwork(ip).version
+ ip_lib.IpRule('sudo').add(ip, table, priority)
+ call_1 = mock.call([ip_version], 'rule', ['show'],
+ run_as_root=True, namespace=None,
+ log_fail_as_error=True)
+ call_2 = mock.call().splitlines()
+ # This is for call().splitlines().__iter__(), which can't be mocked
+ call_3 = mock.ANY
+ call_4 = mock.call([ip_version], 'rule',
+ ('add', 'from', ip,
+ 'table', table, 'priority', priority),
+ run_as_root=True, namespace=None,
+ log_fail_as_error=True)
+ self.execute.assert_has_calls([call_1, call_2, call_3, call_4])
+
+ def _test_add_rule_exists(self, ip, table, priority, output):
+ self.execute.return_value = output
ip_version = netaddr.IPNetwork(ip).version
ip_lib.IpRule('sudo').add(ip, table, priority)
self.execute.assert_called_once_with([ip_version], 'rule',
- ('add', 'from', ip,
- 'table', table,
- 'priority', priority),
+ ['show'],
run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_add_rule_v4(self):
self._test_add_rule('192.168.45.100', 2, 100)
+ def test_add_rule_v4_exists(self):
+ self._test_add_rule_exists('192.168.45.100', 2, 101, RULE_V4_SAMPLE)
+
def test_add_rule_v6(self):
self._test_add_rule('2001:db8::1', 3, 200)
+ def test_add_rule_v6_exists(self):
+ self._test_add_rule_exists('2001:db8::1', 3, 201, RULE_V6_SAMPLE)
+
def test_delete_rule_v4(self):
self._test_delete_rule('192.168.45.100', 2, 100)