'NoopFirewallDriver')
-class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
+class BaseSecurityGroupAgentRpcTestCase(base.BaseTestCase):
def setUp(self, defer_refresh_firewall=False):
- super(SecurityGroupAgentRpcTestCase, self).setUp()
- cfg.CONF.set_default('firewall_driver',
- 'neutron.agent.firewall.NoopFirewallDriver',
- group='SECURITYGROUP')
+ super(BaseSecurityGroupAgentRpcTestCase, self).setUp()
+ set_firewall_driver(FIREWALL_NOOP_DRIVER)
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
mock.patch('neutron.agent.linux.iptables_manager').start()
self.agent.root_helper = 'sudo'
- rpc = mock.Mock()
- rpc.security_group_info_for_devices.side_effect = (
- messaging.UnsupportedVersion('1.2'))
- self.agent.plugin_rpc = rpc
-
+ self.agent.plugin_rpc = mock.Mock()
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
'fake_sgid1',
'remote_group_id':
'fake_sgid2'}]}
- fake_devices = {'fake_device': self.fake_device}
- self.firewall.ports = fake_devices
- rpc.security_group_rules_for_devices.return_value = fake_devices
+ self.firewall.ports = {'fake_device': self.fake_device}
+
+
+class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
+ def setUp(self, defer_refresh_firewall=False):
+ super(SecurityGroupAgentRpcTestCase, self).setUp(
+ defer_refresh_firewall)
+ rpc = self.agent.plugin_rpc
+ rpc.security_group_info_for_devices.side_effect = (
+ messaging.UnsupportedVersion('1.2'))
+ rpc.security_group_rules_for_devices.return_value = (
+ self.firewall.ports)
def test_prepare_and_remove_devices_filter(self):
self.agent.prepare_devices_filter(['fake_device'])
self.firewall.assert_has_calls([])
-class SecurityGroupAgentEnhancedRpcTestCase(base.BaseTestCase):
+class SecurityGroupAgentEnhancedRpcTestCase(
+ BaseSecurityGroupAgentRpcTestCase):
+
def setUp(self, defer_refresh_firewall=False):
- super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp()
- cfg.CONF.set_default('firewall_driver',
- 'neutron.agent.firewall.NoopFirewallDriver',
- group='SECURITYGROUP')
- self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
- self.agent.context = None
- mock.patch('neutron.agent.linux.iptables_manager').start()
- self.agent.root_helper = 'sudo'
- rpc = mock.Mock()
- self.agent.plugin_rpc = rpc
- self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
- self.firewall = mock.Mock()
- firewall_object = firewall_base.FirewallDriver()
- self.firewall.defer_apply.side_effect = firewall_object.defer_apply
- self.agent.firewall = self.firewall
- self.fake_device = {'device': 'fake_device',
- 'security_groups': ['fake_sgid1', 'fake_sgid2'],
- 'security_group_source_groups': ['fake_sgid2'],
- 'security_group_rules': [{'security_group_id':
- 'fake_sgid1',
- 'remote_group_id':
- 'fake_sgid2'}]}
- fake_devices = {'fake_device': self.fake_device}
+ super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp(
+ defer_refresh_firewall=defer_refresh_firewall)
fake_sg_info = {
'security_groups': {
'fake_sgid1': [
{'remote_group_id': 'fake_sgid2'}], 'fake_sgid2': []},
'sg_member_ips': {'fake_sgid2': {'IPv4': [], 'IPv6': []}},
- 'devices': fake_devices}
- self.firewall.ports = fake_devices
- rpc.security_group_info_for_devices.return_value = fake_sg_info
+ 'devices': self.firewall.ports}
+ self.agent.plugin_rpc.security_group_info_for_devices.return_value = (
+ fake_sg_info)
def test_prepare_and_remove_devices_filter_enhanced_rpc(self):
self.agent.prepare_devices_filter(['fake_device'])
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
- def setUp(self, defer_refresh_firewall=False):
+ def setUp(self, defer_refresh_firewall=False, test_rpc_v1_1=True):
super(TestSecurityGroupAgentWithIptables, self).setUp()
config.register_root_helper(cfg.CONF)
cfg.CONF.set_override(
'lock_path',
'$state_path/lock')
- cfg.CONF.set_override(
- 'firewall_driver',
- self.FIREWALL_DRIVER,
- group='SECURITYGROUP')
+ set_firewall_driver(self.FIREWALL_DRIVER)
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
self.agent.root_helper = 'sudo'
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
- self.rpc.security_group_info_for_devices.side_effect = (
- messaging.UnsupportedVersion('1.2'))
+
+ if test_rpc_v1_1:
+ self.rpc.security_group_info_for_devices.side_effect = (
+ messaging.UnsupportedVersion('1.2'))
self.agent.init_firewall(
defer_refresh_firewall=defer_refresh_firewall)
TestSecurityGroupAgentWithIptables):
def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
- defer_refresh_firewall)
- self.rpc = mock.Mock()
- self.agent.plugin_rpc = self.rpc
+ defer_refresh_firewall=defer_refresh_firewall, test_rpc_v1_1=False)
self.sg_info = self.rpc.security_group_info_for_devices
- self.agent.init_firewall(
- defer_refresh_firewall=defer_refresh_firewall)
- self.iptables = self.agent.firewall.iptables
- self.iptables_execute = mock.patch.object(self.iptables,
- "execute").start()
- self.iptables_execute_return_values = []
- self.expected_call_count = 0
- self.expected_calls = []
- self.expected_process_inputs = []
- self.iptables_execute.side_effect = (
- self.iptables_execute_return_values)
rule1 = [{'direction': 'ingress',
'protocol': const.PROTO_NAME_UDP,