import inspect
import os
+import re
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
+from neutron.openstack.common import excutils
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
MAX_CHAIN_LEN_WRAP = 11
MAX_CHAIN_LEN_NOWRAP = 28
+# Number of iptables rules to print before and after a rule that causes a
+# a failure during iptables-restore
+IPTABLES_ERROR_LINES_OF_CONTEXT = 5
+
def get_chain_name(chain_name, wrap=True):
if wrap:
args = ['%s-restore' % (cmd,), '-c']
if self.namespace:
args = ['ip', 'netns', 'exec', self.namespace] + args
- self.execute(args, process_input='\n'.join(all_lines),
- root_helper=self.root_helper)
+ try:
+ self.execute(args, process_input='\n'.join(all_lines),
+ root_helper=self.root_helper)
+ except RuntimeError as r_error:
+ with excutils.save_and_reraise_exception():
+ try:
+ line_no = int(re.search(
+ 'iptables-restore: line ([0-9]+?) failed',
+ str(r_error)).group(1))
+ context = IPTABLES_ERROR_LINES_OF_CONTEXT
+ log_start = max(0, line_no - context)
+ log_end = line_no + context
+ except AttributeError:
+ # line error wasn't found, print all lines instead
+ log_start = 0
+ log_end = len(all_lines)
+ log_lines = ('%7d. %s' % (idx, l)
+ for idx, l in enumerate(
+ all_lines[log_start:log_end],
+ log_start + 1)
+ )
+ LOG.error(_("IPTablesManager.apply failed to apply the "
+ "following set of iptables rules:\n%s"),
+ '\n'.join(log_lines))
LOG.debug(_("IPTablesManager.apply completed with success"))
def _find_table(self, lines, table_name):
{'wrap': True, 'top': False, 'rule': '-j DROP',
'chain': 'nonexistent'})
+ def test_iptables_failure_with_no_failing_line_number(self):
+ with mock.patch.object(iptables_manager, "LOG") as log:
+ # generate Runtime errors on iptables-restore calls
+ def iptables_restore_failer(*args, **kwargs):
+ if 'iptables-restore' in args[0]:
+ self.input_lines = kwargs['process_input'].split('\n')
+ # don't provide a specific failure message so all lines
+ # are logged
+ raise RuntimeError()
+ return FILTER_DUMP
+ self.execute.side_effect = iptables_restore_failer
+ # _apply_synchronized calls iptables-restore so it should raise
+ # a RuntimeError
+ self.assertRaises(RuntimeError,
+ self.iptables._apply_synchronized)
+ # The RuntimeError should have triggered a log of the input to the
+ # process that it failed to execute. Verify by comparing the log
+ # call to the 'process_input' arg given to the failed iptables-restore
+ # call.
+ # Failure without a specific line number in the error should cause
+ # all lines to be logged with numbers.
+ logged = ['%7d. %s' % (n, l)
+ for n, l in enumerate(self.input_lines, 1)]
+ log.error.assert_called_once_with(_(
+ 'IPTablesManager.apply failed to apply the '
+ 'following set of iptables rules:\n%s'),
+ '\n'.join(logged)
+ )
+
+ def test_iptables_failure_on_specific_line(self):
+ with mock.patch.object(iptables_manager, "LOG") as log:
+ # generate Runtime errors on iptables-restore calls
+ def iptables_restore_failer(*args, **kwargs):
+ if 'iptables-restore' in args[0]:
+ self.input_lines = kwargs['process_input'].split('\n')
+ # pretend line 11 failed
+ msg = ("Exit code: 1\nStdout: ''\n"
+ "Stderr: 'iptables-restore: line 11 failed\n'")
+ raise RuntimeError(msg)
+ return FILTER_DUMP
+ self.execute.side_effect = iptables_restore_failer
+ # _apply_synchronized calls iptables-restore so it should raise
+ # a RuntimeError
+ self.assertRaises(RuntimeError,
+ self.iptables._apply_synchronized)
+ # The RuntimeError should have triggered a log of the input to the
+ # process that it failed to execute. Verify by comparing the log
+ # call to the 'process_input' arg given to the failed iptables-restore
+ # call.
+ # Line 11 of the input was marked as failing so lines (11 - context)
+ # to (11 + context) should be logged
+ ctx = iptables_manager.IPTABLES_ERROR_LINES_OF_CONTEXT
+ log_start = max(0, 11 - ctx)
+ log_end = 11 + ctx
+ logged = ['%7d. %s' % (n, l)
+ for n, l in enumerate(self.input_lines[log_start:log_end],
+ log_start + 1)]
+ log.error.assert_called_once_with(_(
+ 'IPTablesManager.apply failed to apply the '
+ 'following set of iptables rules:\n%s'),
+ '\n'.join(logged)
+ )
+
def test_get_traffic_counters_chain_notexists(self):
with mock.patch.object(iptables_manager, "LOG") as log:
acc = self.iptables.get_traffic_counters('chain1')