from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
+from neutron.common import ipv6_utils
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as common_utils
class RouterInfo(object):
- def __init__(self, router_id, root_helper, use_namespaces, router):
+ def __init__(self, router_id, root_helper, use_namespaces, router,
+ use_ipv6=False):
self.router_id = router_id
self.ex_gw_port = None
self._snat_enabled = None
self.ns_name = NS_PREFIX + router_id if use_namespaces else None
self.iptables_manager = iptables_manager.IptablesManager(
root_helper=root_helper,
- #FIXME(danwent): use_ipv6=True,
+ use_ipv6=use_ipv6,
namespace=self.ns_name)
self.routes = []
# DVR Data
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
+ self.use_ipv6 = ipv6_utils.is_enabled()
def _check_config_params(self):
"""Check items in configuration files.
def _router_added(self, router_id, router):
ri = RouterInfo(router_id, self.root_helper,
- self.conf.use_namespaces, router)
+ self.conf.use_namespaces, router,
+ use_ipv6=self.use_ipv6)
self.router_info[router_id] = ri
if self.conf.use_namespaces:
self._create_router_namespace(ri)
SNAT_INT_DEV_PREFIX)
self._external_gateway_added(ri, ex_gw_port, gw_interface_name,
snat_ns_name, preserve_ips=[])
- ri.snat_iptables_manager = (
- iptables_manager.IptablesManager(
- root_helper=self.root_helper, namespace=snat_ns_name
- )
- )
+ ri.snat_iptables_manager = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ namespace=snat_ns_name,
+ use_ipv6=self.use_ipv6)
def external_gateway_added(self, ri, ex_gw_port, interface_name):
if ri.router['distributed']:
def setUp(self):
super(IptablesManagerStateFulTestCase, self).setUp()
self.root_helper = 'sudo'
- self.iptables = (iptables_manager.
- IptablesManager(root_helper=self.root_helper))
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper)
self.execute = mock.patch.object(self.iptables, "execute").start()
def test_binary_name(self):
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
name[:11])
- def test_add_and_remove_chain_custom_binary_name(self):
+ def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
+ expected_calls.insert(2, (
+ mock.call(['ip6tables-save', '-c'],
+ root_helper=self.root_helper),
+ ''))
+ expected_calls.insert(3, (
+ mock.call(['ip6tables-restore', '-c'],
+ process_input=filter_dump,
+ root_helper=self.root_helper),
+ None))
+ expected_calls.extend([
+ (mock.call(['ip6tables-save', '-c'],
+ root_helper=self.root_helper),
+ ''),
+ (mock.call(['ip6tables-restore', '-c'],
+ process_input=filter_dump,
+ root_helper=self.root_helper),
+ None)])
+
+ def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
bn = ("abcdef" * 5)
- self.iptables = (iptables_manager.
- IptablesManager(root_helper=self.root_helper,
- binary_name=bn))
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ binary_name=bn,
+ use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
iptables_args = {'bn': bn[:16]}
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
+ filter_dump_ipv6 = ('# Generated by iptables_manager\n'
+ '*filter\n'
+ ':neutron-filter-top - [0:0]\n'
+ ':%(bn)s-FORWARD - [0:0]\n'
+ ':%(bn)s-INPUT - [0:0]\n'
+ ':%(bn)s-local - [0:0]\n'
+ ':%(bn)s-OUTPUT - [0:0]\n'
+ '[0:0] -A FORWARD -j neutron-filter-top\n'
+ '[0:0] -A OUTPUT -j neutron-filter-top\n'
+ '[0:0] -A neutron-filter-top -j %(bn)s-local\n'
+ '[0:0] -A INPUT -j %(bn)s-INPUT\n'
+ '[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
+ '[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
+ 'COMMIT\n'
+ '# Completed by iptables_manager\n' %
+ iptables_args)
+
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
root_helper=self.root_helper),
None),
]
+ if use_ipv6:
+ self._extend_with_ip6tables_filter(expected_calls_and_values,
+ filter_dump_ipv6)
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def test_empty_chain_custom_binary_name(self):
+ def test_add_and_remove_chain_custom_binary_name(self):
+ self._test_add_and_remove_chain_custom_binary_name_helper(False)
+
+ def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):
+ self._test_add_and_remove_chain_custom_binary_name_helper(True)
+
+ def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):
bn = ("abcdef" * 5)[:16]
- self.iptables = (iptables_manager.
- IptablesManager(root_helper=self.root_helper,
- binary_name=bn))
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ binary_name=bn,
+ use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
iptables_args = {'bn': bn}
root_helper=self.root_helper),
None),
]
+ if use_ipv6:
+ self._extend_with_ip6tables_filter(expected_calls_and_values,
+ filter_dump)
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def test_add_and_remove_chain(self):
+ def test_empty_chain_custom_binary_name(self):
+ self._test_empty_chain_custom_binary_name_helper(False)
+
+ def test_empty_chain_custom_binary_name_with_ipv6(self):
+ self._test_empty_chain_custom_binary_name_helper(True)
+
+ def _test_add_and_remove_chain_helper(self, use_ipv6):
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ use_ipv6=use_ipv6)
+ self.execute = mock.patch.object(self.iptables, "execute").start()
+
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
root_helper=self.root_helper),
None),
]
+ if use_ipv6:
+ self._extend_with_ip6tables_filter(expected_calls_and_values,
+ FILTER_DUMP)
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def test_add_filter_rule(self):
+ def test_add_and_remove_chain(self):
+ self._test_add_and_remove_chain_helper(False)
+
+ def test_add_and_remove_chain_with_ipv6(self):
+ self._test_add_and_remove_chain_helper(True)
+
+ def _test_add_filter_rule_helper(self, use_ipv6):
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ use_ipv6=use_ipv6)
+ self.execute = mock.patch.object(self.iptables, "execute").start()
+
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
),
None),
]
+ if use_ipv6:
+ self._extend_with_ip6tables_filter(expected_calls_and_values,
+ FILTER_DUMP)
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def test_rule_with_wrap_target(self):
+ def test_add_filter_rule(self):
+ self._test_add_filter_rule_helper(False)
+
+ def test_add_filter_rule_with_ipv6(self):
+ self._test_add_filter_rule_helper(True)
+
+ def _test_rule_with_wrap_target_helper(self, use_ipv6):
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ use_ipv6=use_ipv6)
+ self.execute = mock.patch.object(self.iptables, "execute").start()
+
name = '0123456789' * 5
wrap = "%s-%s" % (iptables_manager.binary_name,
iptables_manager.get_chain_name(name))
root_helper=self.root_helper),
None),
]
+ if use_ipv6:
+ self._extend_with_ip6tables_filter(expected_calls_and_values,
+ FILTER_DUMP)
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain(name)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def test_add_nat_rule(self):
+ def test_rule_with_wrap_target(self):
+ self._test_rule_with_wrap_target_helper(False)
+
+ def test_rule_with_wrap_target_with_ipv6(self):
+ self._test_rule_with_wrap_target_helper(True)
+
+ def _test_add_nat_rule_helper(self, use_ipv6):
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ use_ipv6=use_ipv6)
+ self.execute = mock.patch.object(self.iptables, "execute").start()
+
nat_dump = ('# Generated by iptables_manager\n'
'*nat\n'
':neutron-postrouting-bottom - [0:0]\n'
root_helper=self.root_helper),
None),
]
+ if use_ipv6:
+ self._extend_with_ip6tables_filter(expected_calls_and_values,
+ FILTER_DUMP)
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['nat'].add_chain('nat')
tools.verify_mock_calls(self.execute, expected_calls_and_values)
+ def test_add_nat_rule(self):
+ self._test_add_nat_rule_helper(False)
+
+ def test_add_nat_rule_with_ipv6(self):
+ self._test_add_nat_rule_helper(True)
+
def test_add_rule_to_a_nonexistent_chain(self):
self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
'nonexistent', '-j DROP')
'Attempted to get traffic counters of chain %s which '
'does not exist', 'chain1')
- def test_get_traffic_counters(self):
+ def _test_get_traffic_counters_helper(self, use_ipv6):
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ use_ipv6=use_ipv6)
+ self.execute = mock.patch.object(self.iptables, "execute").start()
+ exp_packets = 800
+ exp_bytes = 131802
+
iptables_dump = (
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
' pkts bytes target prot opt in out source'
'-v', '-x'],
root_helper=self.root_helper),
''),
- (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
- '-n', '-v', '-x'],
- root_helper=self.root_helper),
- iptables_dump),
]
+ if use_ipv6:
+ expected_calls_and_values.append(
+ (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
+ '-n', '-v', '-x'],
+ root_helper=self.root_helper),
+ iptables_dump))
+ exp_packets *= 2
+ exp_bytes *= 2
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
acc = self.iptables.get_traffic_counters('OUTPUT')
- self.assertEqual(acc['pkts'], 1600)
- self.assertEqual(acc['bytes'], 263604)
+ self.assertEqual(acc['pkts'], exp_packets)
+ self.assertEqual(acc['bytes'], exp_bytes)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def test_get_traffic_counters_with_zero(self):
+ def test_get_traffic_counters(self):
+ self._test_get_traffic_counters_helper(False)
+
+ def test_get_traffic_counters_with_ipv6(self):
+ self._test_get_traffic_counters_helper(True)
+
+ def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
+ self.iptables = iptables_manager.IptablesManager(
+ root_helper=self.root_helper,
+ use_ipv6=use_ipv6)
+ self.execute = mock.patch.object(self.iptables, "execute").start()
+ exp_packets = 800
+ exp_bytes = 131802
+
iptables_dump = (
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
' pkts bytes target prot opt in out source'
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
'-v', '-x', '-Z'],
root_helper=self.root_helper),
- ''),
- (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
- '-n', '-v', '-x', '-Z'],
- root_helper=self.root_helper),
- iptables_dump),
+ '')
]
+ if use_ipv6:
+ expected_calls_and_values.append(
+ (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
+ '-n', '-v', '-x', '-Z'],
+ root_helper=self.root_helper),
+ iptables_dump))
+ exp_packets *= 2
+ exp_bytes *= 2
+
tools.setup_mock_calls(self.execute, expected_calls_and_values)
acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
- self.assertEqual(acc['pkts'], 1600)
- self.assertEqual(acc['bytes'], 263604)
+ self.assertEqual(acc['pkts'], exp_packets)
+ self.assertEqual(acc['bytes'], exp_bytes)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
+ def test_get_traffic_counters_with_zero(self):
+ self._test_get_traffic_counters_with_zero_helper(False)
+
+ def test_get_traffic_counters_with_zero_with_ipv6(self):
+ self._test_get_traffic_counters_with_zero_helper(True)
+
def _test_find_last_entry(self, find_str):
filter_list = [':neutron-filter-top - [0:0]',
':%(bn)s-FORWARD - [0:0]',