]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
avoid changing the rule's own action
authorYong Sheng Gong <gongysh@unitedstack.com>
Thu, 17 Oct 2013 05:54:25 +0000 (13:54 +0800)
committerYong Sheng Gong <gongysh@unitedstack.com>
Thu, 17 Oct 2013 08:35:05 +0000 (16:35 +0800)
fixes Bug #1240812

When iterating through the rule list, we should not change the rule itself.
So that we can use the rule data for the second router.

Change-Id: I613438507949007bec41416e7f8530693e4d5129

neutron/services/firewall/drivers/linux/iptables_fwaas.py
neutron/tests/unit/services/firewall/drivers/linux/test_iptables_fwaas.py

index eaed8b00441c0751ac9559611a600bce0b159891..ffc467c7ca47a3062ed49682b8ca1b5309fdb642 100644 (file)
@@ -233,11 +233,7 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
         self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule)
 
     def _convert_fwaas_to_iptables_rule(self, rule):
-        if rule.get('action') == 'allow':
-            rule['action'] = 'ACCEPT'
-        else:
-            rule['action'] = 'DROP'
-
+        action = rule.get('action') == 'allow' and 'ACCEPT' or 'DROP'
         args = [self._protocol_arg(rule.get('protocol')),
                 self._port_arg('dport',
                                rule.get('protocol'),
@@ -247,7 +243,7 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
                                rule.get('source_port')),
                 self._ip_prefix_arg('s', rule.get('source_ip_address')),
                 self._ip_prefix_arg('d', rule.get('destination_ip_address')),
-                self._action_arg(rule.get('action'))]
+                self._action_arg(action)]
 
         iptables_rule = ' '.join(args)
         return iptables_rule
index 2550bf1716a66e876bb2317ef9ee62a03b725093..f58a0300ea7cf856f0f0dc48ac499a2ee16ecffd 100644 (file)
@@ -46,23 +46,11 @@ class IptablesFwaasTestCase(base.BaseTestCase):
         self.addCleanup(self.utils_exec_p.stop)
         self.iptables_cls_p = mock.patch(
             'neutron.agent.linux.iptables_manager.IptablesManager')
-        iptables_cls = self.iptables_cls_p.start()
+        self.iptables_cls_p.start()
         self.addCleanup(self.iptables_cls_p.stop)
-        self.iptables_inst = mock.Mock()
-        self.v4filter_inst = mock.Mock()
-        self.v6filter_inst = mock.Mock()
-        self.v4filter_inst.chains = []
-        self.v6filter_inst.chains = []
-        self.iptables_inst.ipv4 = {'filter': self.v4filter_inst}
-        self.iptables_inst.ipv6 = {'filter': self.v6filter_inst}
-        iptables_cls.return_value = self.iptables_inst
-
-        self.router_info_inst = mock.Mock()
-        self.router_info_inst.iptables_manager = self.iptables_inst
-
         self.firewall = fwaas.IptablesFwaasDriver()
 
-    def _fake_rules_v4(self, fwid):
+    def _fake_rules_v4(self, fwid, apply_list):
         rule_list = []
         rule1 = {'enabled': True,
                  'action': 'allow',
@@ -76,9 +64,11 @@ class IptablesFwaasTestCase(base.BaseTestCase):
                  'protocol': 'tcp',
                  'destination_port': '22'}
         ingress_chain = ('iv4%s' % fwid)[:11]
-        self.v4filter_inst.chains.append(ingress_chain)
         egress_chain = ('ov4%s' % fwid)[:11]
-        self.v4filter_inst.chains.append(egress_chain)
+        for router_info_inst in apply_list:
+            v4filter_inst = router_info_inst.iptables_manager.ipv4['filter']
+            v4filter_inst.chains.append(ingress_chain)
+            v4filter_inst.chains.append(egress_chain)
         rule_list.append(rule1)
         rule_list.append(rule2)
         return rule_list
@@ -105,14 +95,25 @@ class IptablesFwaasTestCase(base.BaseTestCase):
                    'firewall_rule_list': rule_list}
         return fw_inst
 
-    def _fake_apply_list(self):
+    def _fake_apply_list(self, router_count=1):
         apply_list = []
-        apply_list.append(self.router_info_inst)
+        while router_count > 0:
+            iptables_inst = mock.Mock()
+            v4filter_inst = mock.Mock()
+            v6filter_inst = mock.Mock()
+            v4filter_inst.chains = []
+            v6filter_inst.chains = []
+            iptables_inst.ipv4 = {'filter': v4filter_inst}
+            iptables_inst.ipv6 = {'filter': v6filter_inst}
+            router_info_inst = mock.Mock()
+            router_info_inst.iptables_manager = iptables_inst
+            apply_list.append(router_info_inst)
+            router_count -= 1
         return apply_list
 
-    def _setup_firewall_with_rules(self, func):
-        apply_list = self._fake_apply_list()
-        rule_list = self._fake_rules_v4(FAKE_FW_ID)
+    def _setup_firewall_with_rules(self, func, router_count=1):
+        apply_list = self._fake_apply_list(router_count=router_count)
+        rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)
         firewall = self._fake_firewall(rule_list)
         func(apply_list, firewall)
         invalid_rule = '-m state --state INVALID -j DROP'
@@ -124,26 +125,32 @@ class IptablesFwaasTestCase(base.BaseTestCase):
         bname = fwaas.iptables_manager.binary_name
         ipt_mgr_ichain = '%s-%s' % (bname, ingress_chain[:11])
         ipt_mgr_echain = '%s-%s' % (bname, egress_chain[:11])
-        calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
-                 call.ensure_remove_chain('ov4fake-fw-uuid'),
-                 call.ensure_remove_chain('fwaas-default-policy'),
-                 call.add_chain('fwaas-default-policy'),
-                 call.add_rule('fwaas-default-policy', '-j DROP'),
-                 call.add_chain(ingress_chain),
-                 call.add_rule(ingress_chain, invalid_rule),
-                 call.add_rule(ingress_chain, est_rule),
-                 call.add_chain(egress_chain),
-                 call.add_rule(egress_chain, invalid_rule),
-                 call.add_rule(egress_chain, est_rule),
-                 call.add_rule(ingress_chain, rule1),
-                 call.add_rule(egress_chain, rule1),
-                 call.add_rule(ingress_chain, rule2),
-                 call.add_rule(egress_chain, rule2),
-                 call.add_rule('FORWARD', '-o qr-+ -j %s' % ipt_mgr_ichain),
-                 call.add_rule('FORWARD', '-i qr-+ -j %s' % ipt_mgr_echain),
-                 call.add_rule('FORWARD', '-o qr-+ -j %s-fwaas-defau' % bname),
-                 call.add_rule('FORWARD', '-i qr-+ -j %s-fwaas-defau' % bname)]
-        self.v4filter_inst.assert_has_calls(calls)
+        for router_info_inst in apply_list:
+            v4filter_inst = router_info_inst.iptables_manager.ipv4['filter']
+            calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
+                     call.ensure_remove_chain('ov4fake-fw-uuid'),
+                     call.ensure_remove_chain('fwaas-default-policy'),
+                     call.add_chain('fwaas-default-policy'),
+                     call.add_rule('fwaas-default-policy', '-j DROP'),
+                     call.add_chain(ingress_chain),
+                     call.add_rule(ingress_chain, invalid_rule),
+                     call.add_rule(ingress_chain, est_rule),
+                     call.add_chain(egress_chain),
+                     call.add_rule(egress_chain, invalid_rule),
+                     call.add_rule(egress_chain, est_rule),
+                     call.add_rule(ingress_chain, rule1),
+                     call.add_rule(egress_chain, rule1),
+                     call.add_rule(ingress_chain, rule2),
+                     call.add_rule(egress_chain, rule2),
+                     call.add_rule('FORWARD',
+                                   '-o qr-+ -j %s' % ipt_mgr_ichain),
+                     call.add_rule('FORWARD',
+                                   '-i qr-+ -j %s' % ipt_mgr_echain),
+                     call.add_rule('FORWARD',
+                                   '-o qr-+ -j %s-fwaas-defau' % bname),
+                     call.add_rule('FORWARD',
+                                   '-i qr-+ -j %s-fwaas-defau' % bname)]
+            v4filter_inst.assert_has_calls(calls)
 
     def test_create_firewall_no_rules(self):
         apply_list = self._fake_apply_list()
@@ -167,11 +174,15 @@ class IptablesFwaasTestCase(base.BaseTestCase):
                  call.add_rule(egress_chain, est_rule),
                  call.add_rule('FORWARD', '-o qr-+ -j %s-fwaas-defau' % bname),
                  call.add_rule('FORWARD', '-i qr-+ -j %s-fwaas-defau' % bname)]
-        self.v4filter_inst.assert_has_calls(calls)
+        apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)
 
     def test_create_firewall_with_rules(self):
         self._setup_firewall_with_rules(self.firewall.create_firewall)
 
+    def test_create_firewall_with_rules_two_routers(self):
+        self._setup_firewall_with_rules(self.firewall.create_firewall,
+                                        router_count=2)
+
     def test_update_firewall_with_rules(self):
         self._setup_firewall_with_rules(self.firewall.update_firewall)
 
@@ -184,11 +195,11 @@ class IptablesFwaasTestCase(base.BaseTestCase):
         calls = [call.ensure_remove_chain(ingress_chain),
                  call.ensure_remove_chain(egress_chain),
                  call.ensure_remove_chain('fwaas-default-policy')]
-        self.v4filter_inst.assert_has_calls(calls)
+        apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)
 
     def test_create_firewall_with_admin_down(self):
-        rule_list = self._fake_rules_v4(FAKE_FW_ID)
         apply_list = self._fake_apply_list()
+        rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)
         firewall = self._fake_firewall_with_admin_down(rule_list)
         self.firewall.create_firewall(apply_list, firewall)
         calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
@@ -196,4 +207,4 @@ class IptablesFwaasTestCase(base.BaseTestCase):
                  call.ensure_remove_chain('fwaas-default-policy'),
                  call.add_chain('fwaas-default-policy'),
                  call.add_rule('fwaas-default-policy', '-j DROP')]
-        self.v4filter_inst.assert_has_calls(calls)
+        apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)