gw_port = self._router.get('gw_port')
self._handle_router_snat_rules(gw_port, interface_name)
- def external_gateway_nat_rules(self, ex_gw_ip, interface_name):
+ def external_gateway_nat_postroute_rules(self, interface_name):
dont_snat_traffic_to_internal_ports_if_not_to_floating_ip = (
'POSTROUTING', '! -i %(interface_name)s '
'! -o %(interface_name)s -m conntrack ! '
'--ctstate DNAT -j ACCEPT' %
{'interface_name': interface_name})
+ return [dont_snat_traffic_to_internal_ports_if_not_to_floating_ip]
+ def external_gateway_nat_snat_rules(self, ex_gw_ip, interface_name):
snat_normal_external_traffic = (
'snat', '-o %s -j SNAT --to-source %s' %
(interface_name, ex_gw_ip))
'-m conntrack --ctstate DNAT '
'-j SNAT --to-source %s'
% (ext_in_mark, l3_constants.ROUTER_MARK_MASK, ex_gw_ip))
-
- return [dont_snat_traffic_to_internal_ports_if_not_to_floating_ip,
- snat_normal_external_traffic,
+ return [snat_normal_external_traffic,
snat_internal_traffic_to_floating_ip]
def external_gateway_mangle_rules(self, interface_name):
def _add_snat_rules(self, ex_gw_port, iptables_manager,
interface_name):
- if self._snat_enabled and ex_gw_port:
+ if ex_gw_port:
# ex_gw_port should not be None in this case
# NAT rules are added only if ex_gw_port has an IPv4 address
for ip_addr in ex_gw_port['fixed_ips']:
ex_gw_ip = ip_addr['ip_address']
if netaddr.IPAddress(ex_gw_ip).version == 4:
- rules = self.external_gateway_nat_rules(ex_gw_ip,
- interface_name)
+ rules = self.external_gateway_nat_postroute_rules(
+ interface_name)
for rule in rules:
iptables_manager.ipv4['nat'].add_rule(*rule)
- rules = self.external_gateway_mangle_rules(interface_name)
- for rule in rules:
- iptables_manager.ipv4['mangle'].add_rule(*rule)
+ if self._snat_enabled:
+ rules = self.external_gateway_nat_snat_rules(
+ ex_gw_ip, interface_name)
+ for rule in rules:
+ iptables_manager.ipv4['nat'].add_rule(*rule)
+ rules = self.external_gateway_mangle_rules(
+ interface_name)
+ for rule in rules:
+ iptables_manager.ipv4['mangle'].add_rule(*rule)
break
def _handle_router_snat_rules(self, ex_gw_port, interface_name):
# IpTablesRule instances
nat_rules_delta = [r for r in orig_nat_rules
if r not in ri.iptables_manager.ipv4['nat'].rules]
- self.assertEqual(3, len(nat_rules_delta))
+ self.assertEqual(2, len(nat_rules_delta))
mangle_rules_delta = [
r for r in orig_mangle_rules
if r not in ri.iptables_manager.ipv4['mangle'].rules]
# IpTablesRule instances
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
if r not in orig_nat_rules]
- self.assertEqual(3, len(nat_rules_delta))
+ self.assertEqual(2, len(nat_rules_delta))
mangle_rules_delta = [
r for r in ri.iptables_manager.ipv4['mangle'].rules
if r not in orig_mangle_rules]
# Get NAT rules with the gw_port
router['gw_port'] = gw_port
ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
- orig_ext_gw_nat_rules = ri.external_gateway_nat_rules
- with mock.patch.object(
- ri,
- 'external_gateway_nat_rules') as external_gateway_nat_rules:
- external_gateway_nat_rules.side_effect = orig_ext_gw_nat_rules
+ p = ri.external_gateway_nat_postroute_rules
+ s = ri.external_gateway_nat_snat_rules
+ attrs_to_mock = dict(
+ [(a, mock.DEFAULT) for a in
+ ['external_gateway_nat_postroute_rules',
+ 'external_gateway_nat_snat_rules']]
+ )
+ with mock.patch.multiple(ri, **attrs_to_mock) as mocks:
+ mocks['external_gateway_nat_postroute_rules'].side_effect = p
+ mocks['external_gateway_nat_snat_rules'].side_effect = s
self._process_router_instance_for_agent(agent, ri, router)
new_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# NAT rules should only change for dual_stack operation
if dual_stack:
- self.assertTrue(external_gateway_nat_rules.called)
+ self.assertTrue(
+ mocks['external_gateway_nat_postroute_rules'].called)
+ self.assertTrue(
+ mocks['external_gateway_nat_snat_rules'].called)
self.assertNotEqual(orig_nat_rules, new_nat_rules)
else:
- self.assertFalse(external_gateway_nat_rules.called)
+ self.assertFalse(
+ mocks['external_gateway_nat_postroute_rules'].called)
+ self.assertFalse(
+ mocks['external_gateway_nat_snat_rules'].called)
self.assertEqual(orig_nat_rules, new_nat_rules)
def test_process_ipv6_only_gw(self):