return []
def _port_chain_name(self, port, direction):
- return '%s%s' % (CHAIN_NAME_PREFIX[direction],
- port['device'][3:])
+ return iptables_manager.get_chain_name(
+ '%s%s' % (CHAIN_NAME_PREFIX[direction], port['device'][3:]))
def filter_defer_apply_on(self):
self.iptables.defer_apply_on()
OVS_HYBRID_TAP_PREFIX = 'tap'
def _port_chain_name(self, port, direction):
- return '%s%s' % (CHAIN_NAME_PREFIX[direction],
- port['device'])
+ return iptables_manager.get_chain_name(
+ '%s%s' % (CHAIN_NAME_PREFIX[direction], port['device']))
def _get_device_name(self, port):
return (self.OVS_HYBRID_TAP_PREFIX + port['device'])[:LINUX_DEV_LEN]
# so we limit it to 16 characters.
# (max_chain_name_length - len('-POSTROUTING') == 16)
binary_name = os.path.basename(inspect.stack()[-1][1])[:16]
+# A length of a chain name must be less than or equal to 11 characters.
+# <max length of iptables chain name> - (<binary_name> + '-') = 28-(16+1) = 11
+MAX_CHAIN_LEN_WRAP = 11
+MAX_CHAIN_LEN_NOWRAP = 28
+
cfg.CONF.set_default('lock_path', '$state_path/lock')
-MAX_CHAIN_LEN = 28
+
+
+def get_chain_name(chain_name, wrap=True):
+ if wrap:
+ return chain_name[:MAX_CHAIN_LEN_WRAP]
+ else:
+ return chain_name[:MAX_CHAIN_LEN_NOWRAP]
class IptablesRule(object):
"""
def __init__(self, chain, rule, wrap=True, top=False):
- self.chain = chain[:MAX_CHAIN_LEN]
+ self.chain = get_chain_name(chain, wrap)
self.rule = rule
self.wrap = wrap
self.top = top
chain = '%s-%s' % (binary_name, self.chain)
else:
chain = self.chain
- chain = chain[:MAX_CHAIN_LEN]
return '-A %s %s' % (chain, self.rule)
end up named 'nova-compute-OUTPUT'.
"""
- name = name[:MAX_CHAIN_LEN]
+ name = get_chain_name(name, wrap)
if wrap:
self.chains.add(name)
else:
This removal "cascades". All rule in the chain are removed, as are
all rules in other chains that jump to it.
"""
- name = name[:MAX_CHAIN_LEN]
+ name = get_chain_name(name, wrap)
chain_set = self._select_chain_set(wrap)
if name not in chain_set:
return
If the chain is not found, this is merely logged.
"""
- name = name[:MAX_CHAIN_LEN]
+ name = get_chain_name(name, wrap)
chain_set = self._select_chain_set(wrap)
if name not in chain_set:
is applied correctly.
"""
+ chain = get_chain_name(chain, wrap)
if wrap and chain not in self.chains:
raise LookupError(_('Unknown chain: %r') % chain)
def _wrap_target_chain(self, s):
if s.startswith('$'):
- return ('%s-%s' % (binary_name, s[1:]))[:MAX_CHAIN_LEN]
+ return ('%s-%s' % (binary_name, s[1:]))
return s
def remove_rule(self, chain, rule, wrap=True, top=False):
CLI tool.
"""
+ chain = get_chain_name(chain, wrap)
try:
self.rules.remove(IptablesRule(chain, rule, wrap, top))
except ValueError:
def empty_chain(self, chain, wrap=True):
"""Remove all rules from a chain."""
- chain = chain[:MAX_CHAIN_LEN]
+ chain = get_chain_name(chain, wrap)
chained_rules = [rule for rule in self.rules
if rule.chain == chain and rule.wrap == wrap]
for rule in chained_rules:
self.assertEqual(iptables_manager.binary_name,
os.path.basename(inspect.stack()[-1][1])[:16])
+ def test_get_chanin_name(self):
+ name = '0123456789' * 5
+ # 28 chars is the maximum length of iptables chain name.
+ self.assertEqual(iptables_manager.get_chain_name(name, wrap=False),
+ name[:28])
+ # 11 chars is the maximum length of chain name of iptable_manager
+ # if binary_name is prepended.
+ self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
+ name[:11])
+
def test_add_and_remove_chain(self):
bn = iptables_manager.binary_name
self.iptables.execute(['iptables-save', '-t', 'filter'],