# list of port which has security group
self.filtered_ports = {}
self._add_fallback_chain_v4v6()
+ self._defer_apply = False
+ self._pre_defer_filtered_ports = None
@property
def ports(self):
def _setup_chains(self):
"""Setup ingress and egress chain for a port."""
+ if not self._defer_apply:
+ self._setup_chains_apply(self.filtered_ports)
+
+ def _setup_chains_apply(self, ports):
self._add_chain_by_name_v4v6(SG_CHAIN)
- for port in self.filtered_ports.values():
+ for port in ports.values():
self._setup_chain(port, INGRESS_DIRECTION)
self._setup_chain(port, EGRESS_DIRECTION)
self.iptables.ipv4['filter'].add_rule(SG_CHAIN, '-j ACCEPT')
def _remove_chains(self):
"""Remove ingress and egress chain for a port."""
- for port in self.filtered_ports.values():
+ if not self._defer_apply:
+ self._remove_chains_apply(self.filtered_ports)
+
+ def _remove_chains_apply(self, ports):
+ for port in ports.values():
self._remove_chain(port, INGRESS_DIRECTION)
self._remove_chain(port, EGRESS_DIRECTION)
self._remove_chain(port, IP_SPOOF_FILTER)
'%s%s' % (CHAIN_NAME_PREFIX[direction], port['device'][3:]))
def filter_defer_apply_on(self):
- self.iptables.defer_apply_on()
+ if not self._defer_apply:
+ self.iptables.defer_apply_on()
+ self._pre_defer_filtered_ports = dict(self.filtered_ports)
+ self._defer_apply = True
def filter_defer_apply_off(self):
- self.iptables.defer_apply_off()
+ if self._defer_apply:
+ self._defer_apply = False
+ self._remove_chains_apply(self._pre_defer_filtered_ports)
+ self._pre_defer_filtered_ports = None
+ self._setup_chains_apply(self.filtered_ports)
+ self.iptables.defer_apply_off()
class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+
import mock
from mock import call
from oslo.config import cfg
self.iptables_inst.assert_has_calls([call.defer_apply_on(),
call.defer_apply_off()])
+ def _mock_chain_applies(self):
+ class CopyingMock(mock.MagicMock):
+ """Copies arguments so mutable arguments can be asserted on.
+
+ Copied verbatim from unittest.mock documentation.
+ """
+ def __call__(self, *args, **kwargs):
+ args = copy.deepcopy(args)
+ kwargs = copy.deepcopy(kwargs)
+ return super(CopyingMock, self).__call__(*args, **kwargs)
+ # Need to use CopyingMock because _{setup,remove}_chains_apply are
+ # usually called with that's modified between calls (i.e.,
+ # self.firewall.filtered_ports).
+ chain_applies = CopyingMock()
+ self.firewall._setup_chains_apply = chain_applies.setup
+ self.firewall._remove_chains_apply = chain_applies.remove
+ return chain_applies
+
+ def test_mock_chain_applies(self):
+ chain_applies = self._mock_chain_applies()
+ port_prepare = {'device': 'd1', 'mac_address': 'prepare'}
+ port_update = {'device': 'd1', 'mac_address': 'update'}
+ self.firewall.prepare_port_filter(port_prepare)
+ self.firewall.update_port_filter(port_update)
+ self.firewall.remove_port_filter(port_update)
+ chain_applies.assert_has_calls([call.remove({}),
+ call.setup({'d1': port_prepare}),
+ call.remove({'d1': port_prepare}),
+ call.setup({'d1': port_update}),
+ call.remove({'d1': port_update}),
+ call.setup({})])
+
+ def test_defer_chain_apply_need_pre_defer_copy(self):
+ chain_applies = self._mock_chain_applies()
+ port = self._fake_port()
+ device2port = {port['device']: port}
+ self.firewall.prepare_port_filter(port)
+ with self.firewall.defer_apply():
+ self.firewall.remove_port_filter(port)
+ chain_applies.assert_has_calls([call.remove({}),
+ call.setup(device2port),
+ call.remove(device2port),
+ call.setup({})])
+
+ def test_defer_chain_apply_coalesce_simple(self):
+ chain_applies = self._mock_chain_applies()
+ port = self._fake_port()
+ with self.firewall.defer_apply():
+ self.firewall.prepare_port_filter(port)
+ self.firewall.update_port_filter(port)
+ self.firewall.remove_port_filter(port)
+ chain_applies.assert_has_calls([call.remove({}), call.setup({})])
+
+ def test_defer_chain_apply_coalesce_multiple_ports(self):
+ chain_applies = self._mock_chain_applies()
+ port1 = {'device': 'd1', 'mac_address': 'mac1'}
+ port2 = {'device': 'd2', 'mac_address': 'mac2'}
+ device2port = {'d1': port1, 'd2': port2}
+ with self.firewall.defer_apply():
+ self.firewall.prepare_port_filter(port1)
+ self.firewall.prepare_port_filter(port2)
+ chain_applies.assert_has_calls([call.remove({}),
+ call.setup(device2port)])
+
def test_ip_spoofing_filter_with_multiple_ips(self):
port = {'device': 'tapfake_dev',
'mac_address': 'ff:ff:ff:ff',