#This is backward compatibility check for Havana
def _is_valid_driver_combination():
return ((cfg.CONF.SECURITYGROUP.enable_security_group and
- cfg.CONF.SECURITYGROUP.firewall_driver !=
- 'neutron.agent.firewall.NoopFirewallDriver') or
+ (cfg.CONF.SECURITYGROUP.firewall_driver and
+ cfg.CONF.SECURITYGROUP.firewall_driver !=
+ 'neutron.agent.firewall.NoopFirewallDriver')) or
(not cfg.CONF.SECURITYGROUP.enable_security_group and
(cfg.CONF.SECURITYGROUP.firewall_driver ==
'neutron.agent.firewall.NoopFirewallDriver' or
- cfg.CONF.SECURITYGROUP.firewall_driver == None)
+ cfg.CONF.SECURITYGROUP.firewall_driver is None)
))
def init_firewall(self, defer_refresh_firewall=False):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
+ if not _is_valid_driver_combination():
+ LOG.warn("Driver configuration doesn't match "
+ "with enable_security_group")
+ if not firewall_driver:
+ firewall_driver = 'neutron.agent.firewall.NoopFirewallDriver'
self.firewall = importutils.import_object(firewall_driver)
# The following flag will be set to true if port filter must not be
# applied as soon as a rule or membership notification is received
[call.security_groups_provider_updated()])
+class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
+ def test_init_firewall_with_none_driver(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', False,
+ group='SECURITYGROUP')
+ agent = sg_rpc.SecurityGroupAgentRpcMixin()
+ agent.init_firewall()
+ self.assertEqual(agent.firewall.__class__.__name__,
+ 'NoopFirewallDriver')
+
+
class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentRpcTestCase, self).setUp()
group='SECURITYGROUP')
self.assertFalse(sg_rpc._is_valid_driver_combination())
+ def test_is_invalid_drvier_combination_sg_enabled_with_none(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', True,
+ group='SECURITYGROUP')
+ cfg.CONF.set_override(
+ 'firewall_driver', None,
+ group='SECURITYGROUP')
+ self.assertFalse(sg_rpc._is_valid_driver_combination())
+
def test_is_invalid_drvier_combination_sg_disabled(self):
cfg.CONF.set_override(
'enable_security_group', False,