raise LookupError(_('Unknown chain: %r') % chain)
if '$' in rule:
- rule = ' '.join(map(self._wrap_target_chain, rule.split(' ')))
+ rule = ' '.join(
+ self._wrap_target_chain(e, wrap) for e in rule.split(' '))
self.rules.append(IptablesRule(chain, rule, wrap, top, self.wrap_name,
tag))
- def _wrap_target_chain(self, s):
+ def _wrap_target_chain(self, s, wrap):
if s.startswith('$'):
- return ('%s-%s' % (self.wrap_name, s[1:]))
+ s = ('%s-%s' % (self.wrap_name, get_chain_name(s[1:], wrap)))
+
return s
def remove_rule(self, chain, rule, wrap=True, top=False):
"""
chain = get_chain_name(chain, wrap)
try:
+ if '$' in rule:
+ rule = ' '.join(
+ self._wrap_target_chain(e, wrap) for e in rule.split(' '))
+
self.rules.remove(IptablesRule(chain, rule, wrap, top,
self.wrap_name))
if not wrap:
tools.verify_mock_calls(self.execute, expected_calls_and_values)
+ def test_rule_with_wrap_target(self):
+ name = '0123456789' * 5
+ wrap = "%s-%s" % (iptables_manager.binary_name,
+ iptables_manager.get_chain_name(name))
+
+ iptables_args = {'bn': iptables_manager.binary_name,
+ 'wrap': wrap}
+
+ filter_dump_mod = ('# Generated by iptables_manager\n'
+ '*filter\n'
+ ':neutron-filter-top - [0:0]\n'
+ ':%(bn)s-FORWARD - [0:0]\n'
+ ':%(bn)s-INPUT - [0:0]\n'
+ ':%(bn)s-local - [0:0]\n'
+ ':%(wrap)s - [0:0]\n'
+ ':%(bn)s-OUTPUT - [0:0]\n'
+ '[0:0] -A FORWARD -j neutron-filter-top\n'
+ '[0:0] -A OUTPUT -j neutron-filter-top\n'
+ '[0:0] -A neutron-filter-top -j %(bn)s-local\n'
+ '[0:0] -A INPUT -j %(bn)s-INPUT\n'
+ '[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
+ '[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
+ '[0:0] -A %(bn)s-INPUT -s 0/0 -d 192.168.0.2 -j '
+ '%(wrap)s\n'
+ 'COMMIT\n'
+ '# Completed by iptables_manager\n'
+ % iptables_args)
+
+ expected_calls_and_values = [
+ (mock.call(['iptables-save', '-c'],
+ root_helper=self.root_helper),
+ ''),
+ (mock.call(['iptables-restore', '-c'],
+ process_input=NAT_DUMP + filter_dump_mod,
+ root_helper=self.root_helper),
+ None),
+ (mock.call(['iptables-save', '-c'],
+ root_helper=self.root_helper),
+ ''),
+ (mock.call(['iptables-restore', '-c'],
+ process_input=NAT_DUMP + FILTER_DUMP,
+ root_helper=self.root_helper),
+ None),
+ ]
+ tools.setup_mock_calls(self.execute, expected_calls_and_values)
+
+ self.iptables.ipv4['filter'].add_chain(name)
+ self.iptables.ipv4['filter'].add_rule('INPUT',
+ '-s 0/0 -d 192.168.0.2 -j'
+ ' $%s' % name)
+ self.iptables.apply()
+
+ self.iptables.ipv4['filter'].remove_rule('INPUT',
+ '-s 0/0 -d 192.168.0.2 -j'
+ ' $%s' % name)
+ self.iptables.ipv4['filter'].remove_chain(name)
+
+ self.iptables.apply()
+
+ tools.verify_mock_calls(self.execute, expected_calls_and_values)
+
def test_add_nat_rule(self):
nat_dump = ('# Generated by iptables_manager\n'
'*nat\n'