# Firewall driver for realizing neutron security group function
# firewall_driver = neutron.agent.firewall.NoopFirewallDriver
# Example: firewall_driver = neutron.agent.linux.iptables_firewall.IptablesFirewallDriver
+
+# Controls if neutron security group is enabled or not.
+# It should be false when you use nova security group.
+# enable_security_group = True
#
# vxlan_group =
# Example: vxlan_group = 239.1.1.1
+
+[security_group]
+# Controls if neutron security group is enabled or not.
+# It should be false when you use nova security group.
+# enable_security_group = True
# agents.
#
# rpc_support_old_agents = False
+
+[securitygroup]
+# Controls if neutron security group is enabled or not.
+# It should be false when you use nova security group.
+# enable_security_group = True
# Firewall driver for realizing neutron security group function
firewall_driver = neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
+# Controls if neutron security group is enabled or not.
+# It should be false when you use nova security group.
+# enable_security_group = True
+
[ofc]
# Specify OpenFlow Controller Host, Port and Driver to connect.
# host = 127.0.0.1
# firewall_driver = neutron.agent.firewall.NoopFirewallDriver
# Example: firewall_driver = neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
+# Controls if neutron security group is enabled or not.
+# It should be false when you use nova security group.
+# enable_security_group = True
+
#-----------------------------------------------------------------------------
# Sample Configurations.
#-----------------------------------------------------------------------------
# Firewall driver for realizing neutron security group function
# firewall_driver = neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
+# Controls if neutron security group is enabled or not.
+# It should be false when you use nova security group.
+# enable_security_group = True
+
[agent]
# Agent's polling interval in seconds
# polling_interval = 2
security_group_opts = [
cfg.StrOpt(
'firewall_driver',
- default='neutron.agent.firewall.NoopFirewallDriver',
- help=_('Driver for Security Groups Firewall'))
+ default=None,
+ help=_('Driver for security groups firewall in the L2 agent')),
+ cfg.BoolOpt(
+ 'enable_security_group',
+ default=True,
+ help=_(
+ 'Controls whether the neutron security group API is enabled '
+ 'in the server. It should be false when using no security '
+ 'groups or using the nova security group API.'))
]
cfg.CONF.register_opts(security_group_opts, 'SECURITYGROUP')
+#This is backward compatibility check for Havana
+def _is_valid_driver_combination():
+ return ((cfg.CONF.SECURITYGROUP.enable_security_group and
+ cfg.CONF.SECURITYGROUP.firewall_driver !=
+ 'neutron.agent.firewall.NoopFirewallDriver') or
+ (not cfg.CONF.SECURITYGROUP.enable_security_group and
+ (cfg.CONF.SECURITYGROUP.firewall_driver ==
+ 'neutron.agent.firewall.NoopFirewallDriver' or
+ cfg.CONF.SECURITYGROUP.firewall_driver == None)
+ ))
+
+
def is_firewall_enabled():
- return (cfg.CONF.SECURITYGROUP.firewall_driver !=
- 'neutron.agent.firewall.NoopFirewallDriver')
+ if not _is_valid_driver_combination():
+ LOG.warn("Driver configuration don't match with enable_security_group")
+
+ return cfg.CONF.SECURITYGROUP.enable_security_group
+
+
+def _disable_extension(extension, aliases):
+ if extension in aliases:
+ aliases.remove(extension)
-def disable_security_group_extension_if_noop_driver(
- supported_extension_aliases):
+def disable_security_group_extension_by_config(aliases):
if not is_firewall_enabled():
- LOG.debug(_('Disabled security-group extension.'))
- supported_extension_aliases.remove('security-group')
+ LOG.info(_('Disabled security-group extension.'))
+ _disable_extension('security-group', aliases)
+ LOG.info(_('Disabled allowed-address-pairs extension.'))
+ _disable_extension('allowed-address-pairs', aliases)
class SecurityGroupServerRpcApiMixin(object):
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self.remove_packet_filter_extension_if_disabled(aliases)
self._aliases = aliases
return self._aliases
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
def supported_extension_aliases(self):
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
- sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ sg_rpc.disable_security_group_extension_by_config(aliases)
self._aliases = aliases
return self._aliases
test_plugin.NeutronDbPluginV2TestCase):
def setUp(self, plugin_name=None):
+ if hasattr(self, 'HAS_PORT_FILTER'):
+ cfg.CONF.set_override(
+ 'enable_security_group', self.HAS_PORT_FILTER, 'SECURITYGROUP')
self.setup_config_files()
self.setup_patches()
if plugin_name:
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = False
+ def setUp(self, plugin_name=None):
+ super(TestBigSwitchProxyPortsV2,
+ self).setUp(self._plugin_name)
+
def test_update_port_status_build(self):
with self.port() as port:
self.assertEqual(port['port']['status'], 'BUILD')
mock.patch('neutron.openstack.common.loopingcall.'
'FixedIntervalLoopingCall',
new=MockFixedIntervalLoopingCall)
-
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
self.agent = hyperv_neutron_agent.HyperVNeutronAgent()
self.agent.plugin_rpc = mock.Mock()
self.agent.sec_groups_agent = mock.MagicMock()
super(TestLinuxBridgeAgent, self).setUp()
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
self.execute_p = mock.patch.object(ip_lib.IPWrapper, '_execute')
self.execute = self.execute_p.start()
self.addCleanup(self.execute_p.stop)
import contextlib
import mock
+from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.extensions import portbindings
test_bindings.PortBindingsTestCase):
VIF_TYPE = portbindings.VIF_TYPE_BRIDGE
HAS_PORT_FILTER = True
+ ENABLE_SG = True
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_IPTABLES_DRIVER
def setUp(self):
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ cfg.CONF.set_override(
+ 'enable_security_group', self.ENABLE_SG,
+ group='SECURITYGROUP')
super(TestLinuxBridgePortBinding, self).setUp()
class TestLinuxBridgePortBindingNoSG(TestLinuxBridgePortBinding):
HAS_PORT_FILTER = False
+ ENABLE_SG = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
# to bind port
VIF_TYPE = portbindings.VIF_TYPE_UNBOUND
HAS_PORT_FILTER = False
+ ENABLE_SG = True
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
def setUp(self, firewall_driver=None):
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ config.cfg.CONF.set_override(
+ 'enable_security_group', self.ENABLE_SG,
+ group='SECURITYGROUP')
super(TestMl2PortBinding, self).setUp()
def _check_port_binding_profile(self, port, profile=None):
class TestMl2PortBindingNoSG(TestMl2PortBinding):
HAS_PORT_FILTER = False
+ ENABLE_SG = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
class MockFixedIntervalLoopingCall(object):
def __init__(self, f):
# See the License for the specific language governing permissions and
# limitations under the License.
+from oslo.config import cfg
from webob import exc
from neutron.extensions import portbindings
class TestMlnxPortBinding(MlnxPluginV2TestCase,
test_bindings.PortBindingsTestCase):
VIF_TYPE = constants.VIF_TYPE_DIRECT
+ ENABLE_SG = False
HAS_PORT_FILTER = False
+ def setUp(self, firewall_driver=None):
+ cfg.CONF.set_override(
+ 'enable_security_group', self.ENABLE_SG,
+ group='SECURITYGROUP')
+ super(TestMlnxPortBinding, self).setUp()
+
def _check_default_port_binding_profole(self, port,
expected_vif_type=None):
if expected_vif_type is None:
class TestMlnxPortBindingNoSG(TestMlnxPortBinding):
HAS_PORT_FILTER = False
+ ENABLE_SG = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
def setUp(self):
super(TestNecAgentBase, self).setUp()
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('host', 'dummy-host')
#
# @author: Akihiro Motoki, NEC Corporation
+from oslo.config import cfg
from testtools import matchers
from webob import exc
test_nec_plugin.NecPluginV2TestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
+ ENABLE_SG = True
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
def setUp(self):
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ cfg.CONF.set_override(
+ 'enable_security_group', self.ENABLE_SG,
+ group='SECURITYGROUP')
super(TestNecPortBinding, self).setUp()
class TestNecPortBindingNoSG(TestNecPortBinding):
HAS_PORT_FILTER = False
+ ENABLE_SG = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
def setUp(self):
super(OFAAgentTestCase, self).setUp()
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
self.fake_oflib_of = fake_oflib.patch_fake_oflib_of().start()
self.mod_agent = importutils.import_module(self._AGENT_NAME)
self.ryuapp = mock.Mock()
def setUp(self):
super(TestOneConvergenceAgentBase, self).setUp()
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
with contextlib.nested(
# See the License for the specific language governing permissions and
# limitations under the License.
+from oslo.config import cfg
+
from neutron.extensions import portbindings
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit import test_db_plugin as test_plugin
test_bindings.PortBindingsTestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
+ ENABLE_SG = True
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
def setUp(self, firewall_driver=None):
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ cfg.CONF.set_override(
+ 'enable_security_group', self.ENABLE_SG,
+ group='SECURITYGROUP')
super(TestOpenvswitchPortBinding, self).setUp()
class TestOpenvswitchPortBindingNoSG(TestOpenvswitchPortBinding):
HAS_PORT_FILTER = False
+ ENABLE_SG = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
def setUp(self):
super(TunnelTest, self).setUp()
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
cfg.CONF.set_override('rpc_backend',
'neutron.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('report_interval', 0, 'AGENT')
from testtools import matchers
import webob.exc
+from neutron.agent.common import config
from neutron.agent import firewall as firewall_base
from neutron.agent.linux import iptables_manager
from neutron.agent import rpc as agent_rpc
class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
def setUp(self, plugin=None):
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
super(SGServerRpcCallBackMixinTestCase, self).setUp(plugin)
self.rpc = FakeSGCallback()
class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentRpcTestCase, self).setUp()
+ cfg.CONF.set_default('firewall_driver',
+ 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
mock.patch('neutron.agent.linux.iptables_manager').start()
def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentWithIptables, self).setUp()
+ config.register_root_helper(cfg.CONF)
+ cfg.CONF.set_override(
+ 'lock_path',
+ '$state_path/lock')
cfg.CONF.set_override(
'firewall_driver',
self.FIREWALL_DRIVER,
class TestSecurityGroupExtensionControl(base.BaseTestCase):
- def test_firewall_enabled_noop_driver(self):
- set_firewall_driver(FIREWALL_NOOP_DRIVER)
- self.assertFalse(sg_rpc.is_firewall_enabled())
-
- def test_firewall_enabled_iptables_driver(self):
- set_firewall_driver(FIREWALL_IPTABLES_DRIVER)
- self.assertTrue(sg_rpc.is_firewall_enabled())
-
- def test_disable_security_group_extension_noop_driver(self):
- set_firewall_driver(FIREWALL_NOOP_DRIVER)
+ def test_disable_security_group_extension_by_config(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', False,
+ group='SECURITYGROUP')
exp_aliases = ['dummy1', 'dummy2']
ext_aliases = ['dummy1', 'security-group', 'dummy2']
- sg_rpc.disable_security_group_extension_if_noop_driver(ext_aliases)
+ sg_rpc.disable_security_group_extension_by_config(ext_aliases)
self.assertEqual(ext_aliases, exp_aliases)
- def test_disable_security_group_extension_iptables_driver(self):
- set_firewall_driver(FIREWALL_IPTABLES_DRIVER)
+ def test_enable_security_group_extension_by_config(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', True,
+ group='SECURITYGROUP')
exp_aliases = ['dummy1', 'security-group', 'dummy2']
ext_aliases = ['dummy1', 'security-group', 'dummy2']
- sg_rpc.disable_security_group_extension_if_noop_driver(ext_aliases)
+ sg_rpc.disable_security_group_extension_by_config(ext_aliases)
self.assertEqual(ext_aliases, exp_aliases)
+
+ def test_is_invalid_drvier_combination_sg_enabled(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', True,
+ group='SECURITYGROUP')
+ cfg.CONF.set_override(
+ 'firewall_driver', 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
+ self.assertFalse(sg_rpc._is_valid_driver_combination())
+
+ def test_is_invalid_drvier_combination_sg_disabled(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', False,
+ group='SECURITYGROUP')
+ cfg.CONF.set_override(
+ 'firewall_driver', 'NonNoopDriver',
+ group='SECURITYGROUP')
+ self.assertFalse(sg_rpc._is_valid_driver_combination())
+
+ def test_is_valid_drvier_combination_sg_enabled(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', True,
+ group='SECURITYGROUP')
+ cfg.CONF.set_override(
+ 'firewall_driver', 'NonNoopDriver',
+ group='SECURITYGROUP')
+ self.assertTrue(sg_rpc._is_valid_driver_combination())
+
+ def test_is_valid_drvier_combination_sg_disabled(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', False,
+ group='SECURITYGROUP')
+ cfg.CONF.set_override(
+ 'firewall_driver', 'neutron.agent.firewall.NoopFirewallDriver',
+ group='SECURITYGROUP')
+ self.assertTrue(sg_rpc._is_valid_driver_combination())
+
+ def test_is_valid_drvier_combination_sg_disabled_with_none(self):
+ cfg.CONF.set_override(
+ 'enable_security_group', False,
+ group='SECURITYGROUP')
+ cfg.CONF.set_override(
+ 'firewall_driver', None,
+ group='SECURITYGROUP')
+ self.assertTrue(sg_rpc._is_valid_driver_combination())