return [rule for rule in self.rules
if rule.chain == chain and rule.wrap == wrap]
- def is_chain_empty(self, chain, wrap=True):
- return not self._get_chain_rules(chain, wrap)
-
def empty_chain(self, chain, wrap=True):
"""Remove all rules from a chain."""
chained_rules = self._get_chain_rules(chain, wrap)
self.ipv4['nat'].add_chain('float-snat')
self.ipv4['nat'].add_rule('snat', '-j $float-snat')
- def is_chain_empty(self, table, chain, ip_version=4, wrap=True):
+ def get_chain(self, table, chain, ip_version=4, wrap=True):
try:
requested_table = {4: self.ipv4, 6: self.ipv6}[ip_version][table]
except KeyError:
- return True
- return requested_table.is_chain_empty(chain, wrap)
+ return []
+ return requested_table._get_chain_rules(chain, wrap)
+
+ def is_chain_empty(self, table, chain, ip_version=4, wrap=True):
+ return not self.get_chain(table, chain, ip_version, wrap)
def defer_apply_on(self):
self.iptables_apply_deferred = True
self._assert_floating_ips(router)
self._assert_snat_chains(router)
self._assert_floating_ip_chains(router)
+ self._assert_metadata_chains(router)
if enable_ha:
self._assert_ha_device(router)
self.assertFalse(router.iptables_manager.is_chain_empty(
'nat', 'float-snat'))
+ def _get_rule(self, iptables_manager, table, chain, predicate):
+ rules = iptables_manager.get_chain(table, chain)
+ result = next(rule for rule in rules if predicate(rule))
+ return result
+
+ def _assert_metadata_chains(self, router):
+ metadata_port_filter = lambda rule: (
+ str(self.agent.conf.metadata_port) in rule.rule)
+ self.assertTrue(self._get_rule(router.iptables_manager,
+ 'nat',
+ 'PREROUTING',
+ metadata_port_filter))
+ self.assertTrue(self._get_rule(router.iptables_manager,
+ 'filter',
+ 'INPUT',
+ metadata_port_filter))
+
def _assert_router_does_not_exist(self, router):
# If the namespace assertion succeeds
# then the devices and iptable rules have also been deleted,