self.addCleanup(self.utils_exec_p.stop)
self.iptables_cls_p = mock.patch(
'neutron.agent.linux.iptables_manager.IptablesManager')
- iptables_cls = self.iptables_cls_p.start()
+ self.iptables_cls_p.start()
self.addCleanup(self.iptables_cls_p.stop)
- self.iptables_inst = mock.Mock()
- self.v4filter_inst = mock.Mock()
- self.v6filter_inst = mock.Mock()
- self.v4filter_inst.chains = []
- self.v6filter_inst.chains = []
- self.iptables_inst.ipv4 = {'filter': self.v4filter_inst}
- self.iptables_inst.ipv6 = {'filter': self.v6filter_inst}
- iptables_cls.return_value = self.iptables_inst
-
- self.router_info_inst = mock.Mock()
- self.router_info_inst.iptables_manager = self.iptables_inst
-
self.firewall = fwaas.IptablesFwaasDriver()
- def _fake_rules_v4(self, fwid):
+ def _fake_rules_v4(self, fwid, apply_list):
rule_list = []
rule1 = {'enabled': True,
'action': 'allow',
'protocol': 'tcp',
'destination_port': '22'}
ingress_chain = ('iv4%s' % fwid)[:11]
- self.v4filter_inst.chains.append(ingress_chain)
egress_chain = ('ov4%s' % fwid)[:11]
- self.v4filter_inst.chains.append(egress_chain)
+ for router_info_inst in apply_list:
+ v4filter_inst = router_info_inst.iptables_manager.ipv4['filter']
+ v4filter_inst.chains.append(ingress_chain)
+ v4filter_inst.chains.append(egress_chain)
rule_list.append(rule1)
rule_list.append(rule2)
return rule_list
'firewall_rule_list': rule_list}
return fw_inst
- def _fake_apply_list(self):
+ def _fake_apply_list(self, router_count=1):
apply_list = []
- apply_list.append(self.router_info_inst)
+ while router_count > 0:
+ iptables_inst = mock.Mock()
+ v4filter_inst = mock.Mock()
+ v6filter_inst = mock.Mock()
+ v4filter_inst.chains = []
+ v6filter_inst.chains = []
+ iptables_inst.ipv4 = {'filter': v4filter_inst}
+ iptables_inst.ipv6 = {'filter': v6filter_inst}
+ router_info_inst = mock.Mock()
+ router_info_inst.iptables_manager = iptables_inst
+ apply_list.append(router_info_inst)
+ router_count -= 1
return apply_list
- def _setup_firewall_with_rules(self, func):
- apply_list = self._fake_apply_list()
- rule_list = self._fake_rules_v4(FAKE_FW_ID)
+ def _setup_firewall_with_rules(self, func, router_count=1):
+ apply_list = self._fake_apply_list(router_count=router_count)
+ rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)
firewall = self._fake_firewall(rule_list)
func(apply_list, firewall)
invalid_rule = '-m state --state INVALID -j DROP'
bname = fwaas.iptables_manager.binary_name
ipt_mgr_ichain = '%s-%s' % (bname, ingress_chain[:11])
ipt_mgr_echain = '%s-%s' % (bname, egress_chain[:11])
- calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
- call.ensure_remove_chain('ov4fake-fw-uuid'),
- call.ensure_remove_chain('fwaas-default-policy'),
- call.add_chain('fwaas-default-policy'),
- call.add_rule('fwaas-default-policy', '-j DROP'),
- call.add_chain(ingress_chain),
- call.add_rule(ingress_chain, invalid_rule),
- call.add_rule(ingress_chain, est_rule),
- call.add_chain(egress_chain),
- call.add_rule(egress_chain, invalid_rule),
- call.add_rule(egress_chain, est_rule),
- call.add_rule(ingress_chain, rule1),
- call.add_rule(egress_chain, rule1),
- call.add_rule(ingress_chain, rule2),
- call.add_rule(egress_chain, rule2),
- call.add_rule('FORWARD', '-o qr-+ -j %s' % ipt_mgr_ichain),
- call.add_rule('FORWARD', '-i qr-+ -j %s' % ipt_mgr_echain),
- call.add_rule('FORWARD', '-o qr-+ -j %s-fwaas-defau' % bname),
- call.add_rule('FORWARD', '-i qr-+ -j %s-fwaas-defau' % bname)]
- self.v4filter_inst.assert_has_calls(calls)
+ for router_info_inst in apply_list:
+ v4filter_inst = router_info_inst.iptables_manager.ipv4['filter']
+ calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
+ call.ensure_remove_chain('ov4fake-fw-uuid'),
+ call.ensure_remove_chain('fwaas-default-policy'),
+ call.add_chain('fwaas-default-policy'),
+ call.add_rule('fwaas-default-policy', '-j DROP'),
+ call.add_chain(ingress_chain),
+ call.add_rule(ingress_chain, invalid_rule),
+ call.add_rule(ingress_chain, est_rule),
+ call.add_chain(egress_chain),
+ call.add_rule(egress_chain, invalid_rule),
+ call.add_rule(egress_chain, est_rule),
+ call.add_rule(ingress_chain, rule1),
+ call.add_rule(egress_chain, rule1),
+ call.add_rule(ingress_chain, rule2),
+ call.add_rule(egress_chain, rule2),
+ call.add_rule('FORWARD',
+ '-o qr-+ -j %s' % ipt_mgr_ichain),
+ call.add_rule('FORWARD',
+ '-i qr-+ -j %s' % ipt_mgr_echain),
+ call.add_rule('FORWARD',
+ '-o qr-+ -j %s-fwaas-defau' % bname),
+ call.add_rule('FORWARD',
+ '-i qr-+ -j %s-fwaas-defau' % bname)]
+ v4filter_inst.assert_has_calls(calls)
def test_create_firewall_no_rules(self):
apply_list = self._fake_apply_list()
call.add_rule(egress_chain, est_rule),
call.add_rule('FORWARD', '-o qr-+ -j %s-fwaas-defau' % bname),
call.add_rule('FORWARD', '-i qr-+ -j %s-fwaas-defau' % bname)]
- self.v4filter_inst.assert_has_calls(calls)
+ apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)
def test_create_firewall_with_rules(self):
self._setup_firewall_with_rules(self.firewall.create_firewall)
+ def test_create_firewall_with_rules_two_routers(self):
+ self._setup_firewall_with_rules(self.firewall.create_firewall,
+ router_count=2)
+
def test_update_firewall_with_rules(self):
self._setup_firewall_with_rules(self.firewall.update_firewall)
calls = [call.ensure_remove_chain(ingress_chain),
call.ensure_remove_chain(egress_chain),
call.ensure_remove_chain('fwaas-default-policy')]
- self.v4filter_inst.assert_has_calls(calls)
+ apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)
def test_create_firewall_with_admin_down(self):
- rule_list = self._fake_rules_v4(FAKE_FW_ID)
apply_list = self._fake_apply_list()
+ rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)
firewall = self._fake_firewall_with_admin_down(rule_list)
self.firewall.create_firewall(apply_list, firewall)
calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
call.ensure_remove_chain('fwaas-default-policy'),
call.add_chain('fwaas-default-policy'),
call.add_rule('fwaas-default-policy', '-j DROP')]
- self.v4filter_inst.assert_has_calls(calls)
+ apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)