# Process SNAT rules for external gateway
if (not ri.router['distributed'] or
ex_gw_port and ri.router['gw_port_host'] == self.host):
- # Get IPv4 only internal CIDRs
- internal_cidrs = [p['ip_cidr'] for p in ri.internal_ports
- if netaddr.IPNetwork(p['ip_cidr']).version == 4]
ri.perform_snat_action(self._handle_router_snat_rules,
- internal_cidrs, interface_name)
+ interface_name)
# Process SNAT/DNAT rules for floating IPs
fip_statuses = {}
else:
ri.disable_keepalived()
- def _handle_router_snat_rules(self, ri, ex_gw_port, internal_cidrs,
+ def _handle_router_snat_rules(self, ri, ex_gw_port,
interface_name, action):
# Remove all the rules
# This is safe because if use_namespaces is set as False
ex_gw_ip = ip_addr['ip_address']
if netaddr.IPAddress(ex_gw_ip).version == 4:
rules = self.external_gateway_nat_rules(ex_gw_ip,
- internal_cidrs,
interface_name)
for rule in rules:
iptables_manager.ipv4['nat'].add_rule(*rule)
'--to-port %s' % self.conf.metadata_port))
return rules
- def external_gateway_nat_rules(self, ex_gw_ip, internal_cidrs,
- interface_name):
+ def external_gateway_nat_rules(self, ex_gw_ip, interface_name):
rules = [('POSTROUTING', '! -i %(interface_name)s '
'! -o %(interface_name)s -m conntrack ! '
'--ctstate DNAT -j ACCEPT' %
- {'interface_name': interface_name})]
- for cidr in internal_cidrs:
- rules.extend(self.internal_network_nat_rules(ex_gw_ip, cidr))
+ {'interface_name': interface_name}),
+ ('snat', '-o %s -j SNAT --to-source %s' %
+ (interface_name, ex_gw_ip))]
return rules
def _snat_redirect_add(self, ri, gateway, sn_port, sn_int):
self.driver.unplug(interface_name, namespace=ri.ns_name,
prefix=INTERNAL_DEV_PREFIX)
- def internal_network_nat_rules(self, ex_gw_ip, internal_cidr):
- rules = [('snat', '-s %s -j SNAT --to-source %s' %
- (internal_cidr, ex_gw_ip))]
- return rules
-
def _create_agent_gateway_port(self, ri, network_id):
"""Create Floating IP gateway port.
interface_name = ('qg-%s' % router['gw_port']['id'])[:14]
expected_rules = [
'! -i %s ! -o %s -m conntrack ! --ctstate DNAT -j ACCEPT' %
- (interface_name, interface_name)]
- for source_cidr in source_cidrs:
- # Create SNAT rules for IPv4 only
- if (netaddr.IPNetwork(source_cidr).version == 4 and
- netaddr.IPNetwork(source_nat_ip).version == 4):
- value_dict = {'source_cidr': source_cidr,
- 'source_nat_ip': source_nat_ip}
- expected_rules.append('-s %(source_cidr)s -j SNAT --to-source '
- '%(source_nat_ip)s' % value_dict)
+ (interface_name, interface_name),
+ '-o %s -j SNAT --to-source %s' % (interface_name, source_nat_ip)]
for r in rules:
if negate:
self.assertNotIn(r.rule, expected_rules)
agent.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
- orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an interface and reprocess
router_append_interface(router)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
- # For some reason set logic does not work well with
- # IpTablesRule instances
- nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
- if r not in orig_nat_rules]
- self.assertEqual(len(nat_rules_delta), 1)
- self._verify_snat_rules(nat_rules_delta, router)
# send_arp is called both times process_router is called
self.assertEqual(self.send_arp.call_count, 2)
agent.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
- orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an IPv4 and IPv6 interface and reprocess
router_append_interface(router, count=1, ip_version=4)
router_append_interface(router, count=1, ip_version=6)
ri.router = router
agent.process_router(ri)
self._assert_ri_process_enabled(ri, 'radvd')
- # For some reason set logic does not work well with
- # IpTablesRule instances
- nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
- if r not in orig_nat_rules]
- self.assertEqual(1, len(nat_rules_delta))
- self._verify_snat_rules(nat_rules_delta, router)
def test_process_router_interface_removed(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent.external_gateway_added = mock.Mock()
# Process with NAT
agent.process_router(ri)
- orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an interface and reprocess
del router[l3_constants.INTERFACE_KEY][1]
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
- # For some reason set logic does not work well with
- # IpTablesRule instances
- nat_rules_delta = [r for r in orig_nat_rules
- if r not in ri.iptables_manager.ipv4['nat'].rules]
- self.assertEqual(len(nat_rules_delta), 1)
- self._verify_snat_rules(nat_rules_delta, router, negate=True)
# send_arp is called both times process_router is called
self.assertEqual(self.send_arp.call_count, 2)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
with mock.patch.object(l3_agent.LOG, 'debug') as log_debug:
agent._handle_router_snat_rules(
- ri, mock.ANY, mock.ANY, mock.ANY, mock.ANY)
+ ri, mock.ANY, mock.ANY, mock.ANY)
self.assertIsNone(ri.snat_iptables_manager)
self.assertFalse(ri.iptables_manager.called)
self.assertTrue(log_debug.called)
port = {'fixed_ips': [{'ip_address': '192.168.1.4'}]}
ri.router = {'distributed': False}
- agent._handle_router_snat_rules(ri, port, [], "iface", "add_rules")
+ agent._handle_router_snat_rules(ri, port, "iface", "add_rules")
nat = ri.iptables_manager.ipv4['nat']
nat.empty_chain.assert_any_call('snat')
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
ri = l3_agent.RouterInfo(_uuid(), self.conf.root_helper, {})
ex_gw_port = {'fixed_ips': [{'ip_address': '192.168.1.4'}]}
- internal_cidrs = ['10.0.0.0/24']
ri.router = {'distributed': False}
- agent._handle_router_snat_rules(ri, ex_gw_port, internal_cidrs,
+ agent._handle_router_snat_rules(ri, ex_gw_port,
"iface", "add_rules")
nat_rules = map(str, ri.iptables_manager.ipv4['nat'].rules)
jump_float_rule = "-A %s-snat -j %s-float-snat" % (wrap_name,
wrap_name)
- internal_net_rule = ("-A %s-snat -s %s -j SNAT --to-source %s") % (
- wrap_name, internal_cidrs[0],
- ex_gw_port['fixed_ips'][0]['ip_address'])
+ snat_rule = ("-A %s-snat -o iface -j SNAT --to-source %s") % (
+ wrap_name, ex_gw_port['fixed_ips'][0]['ip_address'])
self.assertIn(jump_float_rule, nat_rules)
- self.assertIn(internal_net_rule, nat_rules)
+ self.assertIn(snat_rule, nat_rules)
self.assertThat(nat_rules.index(jump_float_rule),
- matchers.LessThan(nat_rules.index(internal_net_rule)))
+ matchers.LessThan(nat_rules.index(snat_rule)))
def test_process_router_delete_stale_internal_devices(self):
class FakeDev(object):