'remote_group_id':
'fake_sgid2'}]}
self.firewall.ports = {'fake_device': self.fake_device}
+ self.firewall.security_group_updated = mock.Mock()
class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
[mock.call.refresh_firewall([self.fake_device['device']])])
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_rule_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_rule_updated(['fake_sgid3', 'fake_sgid4'])
self.assertFalse(self.agent.refresh_firewall.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_member_updated(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
[mock.call.refresh_firewall([self.fake_device['device']])])
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_member_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_member_updated(['fake_sgid3', 'fake_sgid4'])
self.assertFalse(self.agent.refresh_firewall.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_provider_updated(self):
self.agent.refresh_firewall = mock.Mock()
])
def test_security_groups_rule_updated_enhanced_rpc(self):
+ sg_list = ['fake_sgid1', 'fake_sgid3']
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
- self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
+ self.agent.security_groups_rule_updated(sg_list)
self.agent.refresh_firewall.assert_called_once_with(
[self.fake_device['device']])
+ self.firewall.security_group_updated.assert_called_once_with(
+ 'sg_rule', set(sg_list))
def test_security_groups_rule_not_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_rule_updated(['fake_sgid3', 'fake_sgid4'])
self.assertFalse(self.agent.refresh_firewall.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_member_updated_enhanced_rpc(self):
+ sg_list = ['fake_sgid2', 'fake_sgid3']
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
- self.agent.security_groups_member_updated(
- ['fake_sgid2', 'fake_sgid3'])
-
+ self.agent.security_groups_member_updated(sg_list)
self.agent.refresh_firewall.assert_called_once_with(
[self.fake_device['device']])
+ self.firewall.security_group_updated.assert_called_once_with(
+ 'sg_member', set(sg_list))
def test_security_groups_member_not_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.security_groups_member_updated(
['fake_sgid3', 'fake_sgid4'])
self.assertFalse(self.agent.refresh_firewall.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_provider_updated_enhanced_rpc(self):
self.agent.refresh_firewall = mock.Mock()
def test_security_groups_rule_updated(self):
self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_multiple_security_groups_rule_updated_same_port(self):
with self.add_fake_device(device='fake_device_2',
self.agent.security_groups_rule_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_rule_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
'fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_multiple_security_groups_rule_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
self.agent.security_groups_rule_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_member_updated(self):
self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_multiple_security_groups_member_updated_same_port(self):
with self.add_fake_device(device='fake_device_2',
'fake_sgid3'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_member_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
self.agent.security_groups_member_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_multiple_security_groups_member_updated_multiple_ports(self):
with self.add_fake_device(device='fake_device_2',
self.agent.security_groups_member_updated(['fake_sgid2'])
self.assertIn('fake_device', self.agent.devices_to_refilter)
self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_security_groups_provider_updated(self):
self.agent.security_groups_provider_updated(None)
self.agent.prepare_devices_filter.assert_called_once_with(
set(['fake_new_device']))
self.assertFalse(self.agent.refresh_firewall.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filters_updated_ports_only(self):
self.agent.prepare_devices_filter = mock.Mock()
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_updated_device']))
self.assertFalse(self.agent.prepare_devices_filter.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filter_new_and_updated_ports(self):
self.agent.prepare_devices_filter = mock.Mock()
set(['fake_new_device']))
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_updated_device']))
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filters_sg_updates_only(self):
self.agent.prepare_devices_filter = mock.Mock()
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_device']))
self.assertFalse(self.agent.prepare_devices_filter.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filters_sg_updates_and_new_ports(self):
self.agent.prepare_devices_filter = mock.Mock()
set(['fake_new_device']))
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_device']))
+ self.assertFalse(self.firewall.security_group_updated.called)
def _test_prepare_devices_filter(self, devices):
# simulate an RPC arriving and calling _security_group_updated()
self.assertFalse(self.agent.global_refresh_firewall)
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_device']))
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filters_sg_updates_and_updated_ports(self):
self.agent.prepare_devices_filter = mock.Mock()
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_device', 'fake_device_2', 'fake_updated_device']))
self.assertFalse(self.agent.prepare_devices_filter.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filters_all_updates(self):
self.agent.prepare_devices_filter = mock.Mock()
set(['fake_new_device']))
self.agent.refresh_firewall.assert_called_once_with(
set(['fake_device', 'fake_device_2', 'fake_updated_device']))
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filters_no_update(self):
self.agent.prepare_devices_filter = mock.Mock()
self.assertFalse(self.agent.global_refresh_firewall)
self.assertFalse(self.agent.refresh_firewall.called)
self.assertFalse(self.agent.prepare_devices_filter.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
def test_setup_port_filters_with_global_refresh(self):
self.agent.prepare_devices_filter = mock.Mock()
self.assertFalse(self.agent.global_refresh_firewall)
self.agent.refresh_firewall.assert_called_once_with()
self.assertFalse(self.agent.prepare_devices_filter.called)
+ self.assertFalse(self.firewall.security_group_updated.called)
class FakeSGNotifierAPI(sg_rpc.SecurityGroupAgentRpcApiMixin):
'12:34:56:78:9a:bd',
rule5))
])
+ self.agent.firewall.security_group_updated = mock.Mock()
@staticmethod
def _enforce_order_in_firewall(firewall):
self.expected_calls.append(mock.call(*args, **kwargs))
self.expected_call_count += 1
- def _verify_mock_calls(self):
+ def _verify_mock_calls(self, exp_fw_sg_updated_call=False):
self.assertEqual(self.expected_call_count,
self.iptables_execute.call_count)
self.iptables_execute.assert_has_calls(self.expected_calls)
for e in expected:
self.utils_exec.assert_any_call(['sysctl', '-w', e],
run_as_root=True)
+ self.assertEqual(exp_fw_sg_updated_call,
+ self.agent.firewall.security_group_updated.called)
def _replay_iptables(self, v4_filter, v6_filter, raw):
self._register_mock_call(
self.agent.remove_devices_filter(['tap_port2'])
self.agent.remove_devices_filter(['tap_port1'])
- self._verify_mock_calls()
+ self._verify_mock_calls(True)
+ self.assertEqual(
+ 2, self.agent.firewall.security_group_updated.call_count)
def test_security_group_rule_updated(self):
self.sg_info.return_value = self.devices_info2
self.sg_info.return_value = self.devices_info3
self.agent.security_groups_rule_updated(['security_group1'])
- self._verify_mock_calls()
+ self._verify_mock_calls(True)
+ self.agent.firewall.security_group_updated.assert_called_with(
+ 'sg_rule', set(['security_group1']))
class TestSecurityGroupAgentEnhancedIpsetWithIptables(
self.agent.remove_devices_filter(['tap_port2'])
self.agent.remove_devices_filter(['tap_port1'])
- self._verify_mock_calls()
+ self._verify_mock_calls(True)
+ self.assertEqual(
+ 2, self.agent.firewall.security_group_updated.call_count)
def test_security_group_rule_updated(self):
self.sg_info.return_value = self.devices_info2
self.sg_info.return_value = self.devices_info3
self.agent.security_groups_rule_updated(['security_group1'])
- self._verify_mock_calls()
+ self._verify_mock_calls(True)
+ self.agent.firewall.security_group_updated.assert_called_with(
+ 'sg_rule', set(['security_group1']))
class SGNotificationTestMixin(object):