]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Ensure ip6tables are used only if ipv6 is enabled in kernel
authorJakub Libosvar <libosvar@redhat.com>
Thu, 7 Aug 2014 08:35:07 +0000 (10:35 +0200)
committerJakub Libosvar <libosvar@redhat.com>
Tue, 19 Aug 2014 13:12:18 +0000 (15:12 +0200)
On systems where ipv6 module is not loaded in kernel we need to avoid
usage of ip6tables. This patch reads
/proc/sys/net/ipv6/conf/default/disable_ipv6 file and if ipv6 is
disabled then ip6tables are not used in IptablesManager

Closes-Bug: #1352893

Change-Id: I07e5851aa25eb98b7a97dff86b9850475df85f64

neutron/agent/l3_agent.py
neutron/agent/linux/iptables_firewall.py
neutron/agent/linux/iptables_manager.py
neutron/common/ipv6_utils.py
neutron/services/metering/drivers/iptables/iptables_driver.py
neutron/tests/unit/services/metering/drivers/test_iptables_driver.py
neutron/tests/unit/test_iptables_manager.py
neutron/tests/unit/test_ipv6.py
neutron/tests/unit/test_security_groups_rpc.py

index 15390d873cc0ba4277b18ebacbc7e3376e40c332..84976fb69756f8801a8d890ad730b26366462575 100644 (file)
@@ -32,6 +32,7 @@ from neutron.agent.linux import ra
 from neutron.agent import rpc as agent_rpc
 from neutron.common import config as common_config
 from neutron.common import constants as l3_constants
+from neutron.common import ipv6_utils
 from neutron.common import rpc as n_rpc
 from neutron.common import topics
 from neutron.common import utils as common_utils
@@ -144,7 +145,8 @@ class L3PluginApi(n_rpc.RpcProxy):
 
 class RouterInfo(object):
 
-    def __init__(self, router_id, root_helper, use_namespaces, router):
+    def __init__(self, router_id, root_helper, use_namespaces, router,
+                 use_ipv6=False):
         self.router_id = router_id
         self.ex_gw_port = None
         self._snat_enabled = None
@@ -160,7 +162,7 @@ class RouterInfo(object):
         self.ns_name = NS_PREFIX + router_id if use_namespaces else None
         self.iptables_manager = iptables_manager.IptablesManager(
             root_helper=root_helper,
-            #FIXME(danwent): use_ipv6=True,
+            use_ipv6=use_ipv6,
             namespace=self.ns_name)
         self.routes = []
         # DVR Data
@@ -440,6 +442,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
         super(L3NATAgent, self).__init__(conf=self.conf)
 
         self.target_ex_net_id = None
+        self.use_ipv6 = ipv6_utils.is_enabled()
 
     def _check_config_params(self):
         """Check items in configuration files.
@@ -609,7 +612,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
 
     def _router_added(self, router_id, router):
         ri = RouterInfo(router_id, self.root_helper,
-                        self.conf.use_namespaces, router)
+                        self.conf.use_namespaces, router,
+                        use_ipv6=self.use_ipv6)
         self.router_info[router_id] = ri
         if self.conf.use_namespaces:
             self._create_router_namespace(ri)
@@ -1076,11 +1080,10 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
                                          SNAT_INT_DEV_PREFIX)
         self._external_gateway_added(ri, ex_gw_port, gw_interface_name,
                                      snat_ns_name, preserve_ips=[])
-        ri.snat_iptables_manager = (
-            iptables_manager.IptablesManager(
-                root_helper=self.root_helper, namespace=snat_ns_name
-            )
-        )
+        ri.snat_iptables_manager = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            namespace=snat_ns_name,
+            use_ipv6=self.use_ipv6)
 
     def external_gateway_added(self, ri, ex_gw_port, interface_name):
         if ri.router['distributed']:
index ff65a9b2771f2597c9a95355c71ad06c6b2eef62..20f56cd610580130315bb9a131018a1c98be1bd0 100644 (file)
@@ -19,6 +19,7 @@ from oslo.config import cfg
 from neutron.agent import firewall
 from neutron.agent.linux import iptables_manager
 from neutron.common import constants
+from neutron.common import ipv6_utils
 from neutron.openstack.common import log as logging
 
 
@@ -41,7 +42,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
     def __init__(self):
         self.iptables = iptables_manager.IptablesManager(
             root_helper=cfg.CONF.AGENT.root_helper,
-            use_ipv6=True)
+            use_ipv6=ipv6_utils.is_enabled())
         # list of port which has security group
         self.filtered_ports = {}
         self._add_fallback_chain_v4v6()
index 759aafe0c7631ddc352645b385f214fc08d05a61..2506d745fffeab49119426c0aa674fb563fe2903 100644 (file)
@@ -625,8 +625,10 @@ class IptablesManager(object):
         cmd_tables = [('iptables', key) for key, table in self.ipv4.items()
                       if name in table._select_chain_set(wrap)]
 
-        cmd_tables += [('ip6tables', key) for key, table in self.ipv6.items()
-                       if name in table._select_chain_set(wrap)]
+        if self.use_ipv6:
+            cmd_tables += [('ip6tables', key)
+                           for key, table in self.ipv6.items()
+                           if name in table._select_chain_set(wrap)]
 
         return cmd_tables
 
index fbe61e49bb6d31e48559440fb21b2af575c0659a..21b86acae192d34eeb58367a820e12d9235af376 100644 (file)
@@ -20,6 +20,9 @@ IPv6-related utilities and helper functions.
 import netaddr
 
 
+_IS_IPV6_ENABLED = None
+
+
 def get_ipv6_addr_by_EUI64(prefix, mac):
     # Check if the prefix is IPv4 address
     isIPv4 = netaddr.valid_ipv4(prefix)
@@ -37,3 +40,14 @@ def get_ipv6_addr_by_EUI64(prefix, mac):
     except TypeError:
         raise TypeError(_('Bad prefix type for generate IPv6 address by '
                           'EUI-64: %s') % prefix)
+
+
+def is_enabled():
+    global _IS_IPV6_ENABLED
+
+    if _IS_IPV6_ENABLED is None:
+        disabled_ipv6_path = "/proc/sys/net/ipv6/conf/default/disable_ipv6"
+        with open(disabled_ipv6_path, 'r') as f:
+            disabled = f.read().strip()
+        _IS_IPV6_ENABLED = disabled == "0"
+    return _IS_IPV6_ENABLED
index 8f2890fadc56945ba6c6f996fce1eaf24029de3f..6c3290502cf04152bc68baed2fe78c69f7998334 100644 (file)
@@ -20,6 +20,7 @@ from neutron.agent.common import config
 from neutron.agent.linux import interface
 from neutron.agent.linux import iptables_manager
 from neutron.common import constants as constants
+from neutron.common import ipv6_utils
 from neutron.common import log
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
@@ -74,7 +75,8 @@ class RouterWithMetering(object):
         self.iptables_manager = iptables_manager.IptablesManager(
             root_helper=self.root_helper,
             namespace=self.ns_name,
-            binary_name=WRAP_NAME)
+            binary_name=WRAP_NAME,
+            use_ipv6=ipv6_utils.is_enabled())
         self.metering_labels = {}
 
 
index fe30edb747b92c751ad1dc5c4a9790981907925c..8f18343a0c20f924f227c8082f54f364e58115be 100644 (file)
@@ -65,7 +65,8 @@ class IptablesDriverTestCase(base.BaseTestCase):
 
         self.iptables_cls.assert_called_with(root_helper='fake_sudo',
                                              namespace=mock.ANY,
-                                             binary_name=mock.ANY)
+                                             binary_name=mock.ANY,
+                                             use_ipv6=mock.ANY)
 
     def test_add_metering_label(self):
         routers = [{'_metering_labels': [
index e38086253f47d85470f7b1357b20b4d262aa51f1..74f0bb7ca587fa935abd5f15b5c2796967ee66e6 100644 (file)
@@ -67,8 +67,8 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
     def setUp(self):
         super(IptablesManagerStateFulTestCase, self).setUp()
         self.root_helper = 'sudo'
-        self.iptables = (iptables_manager.
-                         IptablesManager(root_helper=self.root_helper))
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper)
         self.execute = mock.patch.object(self.iptables, "execute").start()
 
     def test_binary_name(self):
@@ -85,12 +85,32 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
         self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
                          name[:11])
 
-    def test_add_and_remove_chain_custom_binary_name(self):
+    def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
+        expected_calls.insert(2, (
+            mock.call(['ip6tables-save', '-c'],
+                      root_helper=self.root_helper),
+            ''))
+        expected_calls.insert(3, (
+            mock.call(['ip6tables-restore', '-c'],
+                      process_input=filter_dump,
+                      root_helper=self.root_helper),
+            None))
+        expected_calls.extend([
+            (mock.call(['ip6tables-save', '-c'],
+                      root_helper=self.root_helper),
+             ''),
+            (mock.call(['ip6tables-restore', '-c'],
+                      process_input=filter_dump,
+                      root_helper=self.root_helper),
+             None)])
+
+    def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
         bn = ("abcdef" * 5)
 
-        self.iptables = (iptables_manager.
-                         IptablesManager(root_helper=self.root_helper,
-                                         binary_name=bn))
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            binary_name=bn,
+            use_ipv6=use_ipv6)
         self.execute = mock.patch.object(self.iptables, "execute").start()
 
         iptables_args = {'bn': bn[:16]}
@@ -112,6 +132,23 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                        'COMMIT\n'
                        '# Completed by iptables_manager\n' % iptables_args)
 
+        filter_dump_ipv6 = ('# Generated by iptables_manager\n'
+                            '*filter\n'
+                            ':neutron-filter-top - [0:0]\n'
+                            ':%(bn)s-FORWARD - [0:0]\n'
+                            ':%(bn)s-INPUT - [0:0]\n'
+                            ':%(bn)s-local - [0:0]\n'
+                            ':%(bn)s-OUTPUT - [0:0]\n'
+                            '[0:0] -A FORWARD -j neutron-filter-top\n'
+                            '[0:0] -A OUTPUT -j neutron-filter-top\n'
+                            '[0:0] -A neutron-filter-top -j %(bn)s-local\n'
+                            '[0:0] -A INPUT -j %(bn)s-INPUT\n'
+                            '[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
+                            '[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
+                            'COMMIT\n'
+                            '# Completed by iptables_manager\n' %
+                            iptables_args)
+
         filter_dump_mod = ('# Generated by iptables_manager\n'
                            '*filter\n'
                            ':neutron-filter-top - [0:0]\n'
@@ -164,6 +201,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                        root_helper=self.root_helper),
              None),
         ]
+        if use_ipv6:
+            self._extend_with_ip6tables_filter(expected_calls_and_values,
+                                               filter_dump_ipv6)
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         self.iptables.ipv4['filter'].add_chain('filter')
@@ -174,12 +215,19 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
-    def test_empty_chain_custom_binary_name(self):
+    def test_add_and_remove_chain_custom_binary_name(self):
+        self._test_add_and_remove_chain_custom_binary_name_helper(False)
+
+    def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):
+        self._test_add_and_remove_chain_custom_binary_name_helper(True)
+
+    def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):
         bn = ("abcdef" * 5)[:16]
 
-        self.iptables = (iptables_manager.
-                         IptablesManager(root_helper=self.root_helper,
-                                         binary_name=bn))
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            binary_name=bn,
+            use_ipv6=use_ipv6)
         self.execute = mock.patch.object(self.iptables, "execute").start()
 
         iptables_args = {'bn': bn}
@@ -253,6 +301,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                        root_helper=self.root_helper),
              None),
         ]
+        if use_ipv6:
+            self._extend_with_ip6tables_filter(expected_calls_and_values,
+                                               filter_dump)
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         self.iptables.ipv4['filter'].add_chain('filter')
@@ -265,7 +317,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
-    def test_add_and_remove_chain(self):
+    def test_empty_chain_custom_binary_name(self):
+        self._test_empty_chain_custom_binary_name_helper(False)
+
+    def test_empty_chain_custom_binary_name_with_ipv6(self):
+        self._test_empty_chain_custom_binary_name_helper(True)
+
+    def _test_add_and_remove_chain_helper(self, use_ipv6):
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            use_ipv6=use_ipv6)
+        self.execute = mock.patch.object(self.iptables, "execute").start()
+
         filter_dump_mod = ('# Generated by iptables_manager\n'
                            '*filter\n'
                            ':neutron-filter-top - [0:0]\n'
@@ -300,6 +363,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                        root_helper=self.root_helper),
              None),
         ]
+        if use_ipv6:
+            self._extend_with_ip6tables_filter(expected_calls_and_values,
+                                               FILTER_DUMP)
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         self.iptables.ipv4['filter'].add_chain('filter')
@@ -310,7 +377,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
-    def test_add_filter_rule(self):
+    def test_add_and_remove_chain(self):
+        self._test_add_and_remove_chain_helper(False)
+
+    def test_add_and_remove_chain_with_ipv6(self):
+        self._test_add_and_remove_chain_helper(True)
+
+    def _test_add_filter_rule_helper(self, use_ipv6):
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            use_ipv6=use_ipv6)
+        self.execute = mock.patch.object(self.iptables, "execute").start()
+
         filter_dump_mod = ('# Generated by iptables_manager\n'
                            '*filter\n'
                            ':neutron-filter-top - [0:0]\n'
@@ -349,6 +427,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                        ),
              None),
         ]
+        if use_ipv6:
+            self._extend_with_ip6tables_filter(expected_calls_and_values,
+                                               FILTER_DUMP)
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         self.iptables.ipv4['filter'].add_chain('filter')
@@ -369,7 +451,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
-    def test_rule_with_wrap_target(self):
+    def test_add_filter_rule(self):
+        self._test_add_filter_rule_helper(False)
+
+    def test_add_filter_rule_with_ipv6(self):
+        self._test_add_filter_rule_helper(True)
+
+    def _test_rule_with_wrap_target_helper(self, use_ipv6):
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            use_ipv6=use_ipv6)
+        self.execute = mock.patch.object(self.iptables, "execute").start()
+
         name = '0123456789' * 5
         wrap = "%s-%s" % (iptables_manager.binary_name,
                           iptables_manager.get_chain_name(name))
@@ -413,6 +506,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                        root_helper=self.root_helper),
              None),
         ]
+        if use_ipv6:
+            self._extend_with_ip6tables_filter(expected_calls_and_values,
+                                               FILTER_DUMP)
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         self.iptables.ipv4['filter'].add_chain(name)
@@ -430,7 +527,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
-    def test_add_nat_rule(self):
+    def test_rule_with_wrap_target(self):
+        self._test_rule_with_wrap_target_helper(False)
+
+    def test_rule_with_wrap_target_with_ipv6(self):
+        self._test_rule_with_wrap_target_helper(True)
+
+    def _test_add_nat_rule_helper(self, use_ipv6):
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            use_ipv6=use_ipv6)
+        self.execute = mock.patch.object(self.iptables, "execute").start()
+
         nat_dump = ('# Generated by iptables_manager\n'
                     '*nat\n'
                     ':neutron-postrouting-bottom - [0:0]\n'
@@ -488,6 +596,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                        root_helper=self.root_helper),
              None),
         ]
+        if use_ipv6:
+            self._extend_with_ip6tables_filter(expected_calls_and_values,
+                                               FILTER_DUMP)
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         self.iptables.ipv4['nat'].add_chain('nat')
@@ -512,6 +624,12 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
+    def test_add_nat_rule(self):
+        self._test_add_nat_rule_helper(False)
+
+    def test_add_nat_rule_with_ipv6(self):
+        self._test_add_nat_rule_helper(True)
+
     def test_add_rule_to_a_nonexistent_chain(self):
         self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
                           'nonexistent', '-j DROP')
@@ -604,7 +722,14 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
             'Attempted to get traffic counters of chain %s which '
             'does not exist', 'chain1')
 
-    def test_get_traffic_counters(self):
+    def _test_get_traffic_counters_helper(self, use_ipv6):
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            use_ipv6=use_ipv6)
+        self.execute = mock.patch.object(self.iptables, "execute").start()
+        exp_packets = 800
+        exp_bytes = 131802
+
         iptables_dump = (
             'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
             '    pkts      bytes target     prot opt in     out     source'
@@ -623,20 +748,38 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
                         '-v', '-x'],
                        root_helper=self.root_helper),
              ''),
-            (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
-                        '-n', '-v', '-x'],
-                       root_helper=self.root_helper),
-             iptables_dump),
         ]
+        if use_ipv6:
+            expected_calls_and_values.append(
+                (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
+                           '-n', '-v', '-x'],
+                           root_helper=self.root_helper),
+                 iptables_dump))
+            exp_packets *= 2
+            exp_bytes *= 2
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         acc = self.iptables.get_traffic_counters('OUTPUT')
-        self.assertEqual(acc['pkts'], 1600)
-        self.assertEqual(acc['bytes'], 263604)
+        self.assertEqual(acc['pkts'], exp_packets)
+        self.assertEqual(acc['bytes'], exp_bytes)
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
-    def test_get_traffic_counters_with_zero(self):
+    def test_get_traffic_counters(self):
+        self._test_get_traffic_counters_helper(False)
+
+    def test_get_traffic_counters_with_ipv6(self):
+        self._test_get_traffic_counters_helper(True)
+
+    def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
+        self.iptables = iptables_manager.IptablesManager(
+            root_helper=self.root_helper,
+            use_ipv6=use_ipv6)
+        self.execute = mock.patch.object(self.iptables, "execute").start()
+        exp_packets = 800
+        exp_bytes = 131802
+
         iptables_dump = (
             'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
             '    pkts      bytes target     prot opt in     out     source'
@@ -654,20 +797,31 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
             (mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
                         '-v', '-x', '-Z'],
                        root_helper=self.root_helper),
-             ''),
-            (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
-                        '-n', '-v', '-x', '-Z'],
-                       root_helper=self.root_helper),
-             iptables_dump),
+             '')
         ]
+        if use_ipv6:
+            expected_calls_and_values.append(
+                (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
+                            '-n', '-v', '-x', '-Z'],
+                           root_helper=self.root_helper),
+                 iptables_dump))
+            exp_packets *= 2
+            exp_bytes *= 2
+
         tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
         acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
-        self.assertEqual(acc['pkts'], 1600)
-        self.assertEqual(acc['bytes'], 263604)
+        self.assertEqual(acc['pkts'], exp_packets)
+        self.assertEqual(acc['bytes'], exp_bytes)
 
         tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
+    def test_get_traffic_counters_with_zero(self):
+        self._test_get_traffic_counters_with_zero_helper(False)
+
+    def test_get_traffic_counters_with_zero_with_ipv6(self):
+        self._test_get_traffic_counters_with_zero_helper(True)
+
     def _test_find_last_entry(self, find_str):
         filter_list = [':neutron-filter-top - [0:0]',
                        ':%(bn)s-FORWARD - [0:0]',
index 47bfd2a4b03ff3c75af69524084592a9e377d190..2dea82bcb5fb026cff3b7e84e652570f38ab9c57 100644 (file)
@@ -13,6 +13,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import mock
+
 from neutron.common import ipv6_utils
 from neutron.tests import base
 
@@ -48,3 +50,29 @@ class IPv6byEUI64TestCase(base.BaseTestCase):
         prefix = 123
         self.assertRaises(TypeError, lambda:
                           ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac))
+
+
+class TestIsEnabled(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestIsEnabled, self).setUp()
+        ipv6_utils._IS_IPV6_ENABLED = None
+        mock_open = mock.patch("__builtin__.open").start()
+        self.mock_read = mock_open.return_value.__enter__.return_value.read
+
+    def test_enabled(self):
+        self.mock_read.return_value = "0"
+        enabled = ipv6_utils.is_enabled()
+        self.assertTrue(enabled)
+
+    def test_disabled(self):
+        self.mock_read.return_value = "1"
+        enabled = ipv6_utils.is_enabled()
+        self.assertFalse(enabled)
+
+    def test_memoize(self):
+        self.mock_read.return_value = "0"
+        ipv6_utils.is_enabled()
+        enabled = ipv6_utils.is_enabled()
+        self.assertTrue(enabled)
+        self.mock_read.assert_called_once_with()
index 52e66a25821d074c6ed1afa9036054cda96370da..81a43ea5bfab187b5927a254f96369ef2bc9a5ff 100644 (file)
@@ -1821,6 +1821,9 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
         self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
 
         self.iptables = self.agent.firewall.iptables
+        # TODO(jlibosva) Get rid of mocking iptables execute and mock out
+        # firewall instead
+        self.iptables.use_ipv6 = True
         self.iptables_execute = mock.patch.object(self.iptables,
                                                   "execute").start()
         self.iptables_execute_return_values = []