return rules_index
+ def _find_last_entry(self, filter_list, match_str):
+ # find a matching entry, starting from the bottom
+ for s in reversed(filter_list):
+ s = s.strip()
+ if match_str in s:
+ return s
+
def _modify_rules(self, current_lines, table, table_name):
unwrapped_chains = table.unwrapped_chains
chains = table.chains
for chain in all_chains:
chain_str = str(chain).strip()
- orig_filter = [s for s in old_filter if chain_str in s.strip()]
- dup_filter = [s for s in new_filter if chain_str in s.strip()]
+ old = self._find_last_entry(old_filter, chain_str)
+ if not old:
+ dup = self._find_last_entry(new_filter, chain_str)
new_filter = [s for s in new_filter if chain_str not in s.strip()]
# if no old or duplicates, use original chain
- if orig_filter:
- # grab the last entry, if there is one
- old = orig_filter[-1]
- chain_str = str(old).strip()
- elif dup_filter:
- # grab the last entry, if there is one
- dup = dup_filter[-1]
- chain_str = str(dup).strip()
+ if old or dup:
+ chain_str = str(old or dup)
else:
# add-on the [packet:bytes]
chain_str += ' - [0:0]'
# Further down, we weed out duplicates from the bottom of the
# list, so here we remove the dupes ahead of time.
- orig_filter = [s for s in old_filter if rule_str in s.strip()]
- dup_filter = [s for s in new_filter if rule_str in s.strip()]
+ old = self._find_last_entry(old_filter, rule_str)
+ if not old:
+ dup = self._find_last_entry(new_filter, rule_str)
new_filter = [s for s in new_filter if rule_str not in s.strip()]
# if no old or duplicates, use original rule
- if orig_filter:
- # grab the last entry, if there is one
- old = orig_filter[-1]
- rule_str = str(old).strip()
- elif dup_filter:
- # grab the last entry, if there is one
- dup = dup_filter[-1]
- rule_str = str(dup).strip()
+ if old or dup:
+ rule_str = str(old or dup)
# backup one index so we write the array correctly
- rules_index -= 1
+ if not old:
+ rules_index -= 1
else:
# add-on the [packet:bytes]
rule_str = '[0:0] ' + rule_str
tools.verify_mock_calls(self.execute, expected_calls_and_values)
+ def _test_find_last_entry(self, find_str):
+ filter_list = [':neutron-filter-top - [0:0]',
+ ':%(bn)s-FORWARD - [0:0]',
+ ':%(bn)s-INPUT - [0:0]',
+ ':%(bn)s-local - [0:0]',
+ ':%(wrap)s - [0:0]',
+ ':%(bn)s-OUTPUT - [0:0]',
+ '[0:0] -A FORWARD -j neutron-filter-top',
+ '[0:0] -A OUTPUT -j neutron-filter-top'
+ % IPTABLES_ARG]
+
+ return self.iptables._find_last_entry(filter_list, find_str)
+
+ def test_find_last_entry_old_dup(self):
+ find_str = 'neutron-filter-top'
+ match_str = '[0:0] -A OUTPUT -j neutron-filter-top'
+ ret_str = self._test_find_last_entry(find_str)
+ self.assertEqual(ret_str, match_str)
+
+ def test_find_last_entry_none(self):
+ find_str = 'neutron-filter-NOTFOUND'
+ ret_str = self._test_find_last_entry(find_str)
+ self.assertIsNone(ret_str)
+
class IptablesManagerStateLessTestCase(base.BaseTestCase):