cfg.CONF.register_opts(security_group_opts, 'SECURITYGROUP')
+def is_firewall_enabled():
+ return (cfg.CONF.SECURITYGROUP.firewall_driver !=
+ 'quantum.agent.firewall.NoopFirewallDriver')
+
+
+def disable_security_group_extension_if_noop_driver(
+ supported_extension_aliases):
+ if not is_firewall_enabled():
+ LOG.debug(_('Disabled security-group extension.'))
+ supported_extension_aliases.remove('security-group')
+
+
class SecurityGroupServerRpcApiMixin(object):
"""A mix-in that enable SecurityGroup support in plugin rpc
"""
__native_pagination_support = True
__native_sorting_support = True
- supported_extension_aliases = ["provider", "router", "binding", "quotas",
- "security-group", "agent", "extraroute",
- "agent_scheduler"]
+ _supported_extension_aliases = ["provider", "router", "binding", "quotas",
+ "security-group", "agent", "extraroute",
+ "agent_scheduler"]
+
+ @property
+ def supported_extension_aliases(self):
+ if not hasattr(self, '_aliases'):
+ aliases = self._supported_extension_aliases[:]
+ sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ self._aliases = aliases
+ return self._aliases
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
The port binding extension enables an external application relay
information to and from the plugin.
"""
-
- supported_extension_aliases = ["router", "quotas", "binding",
- "security-group", "extraroute",
- "agent", "agent_scheduler",
- ]
+ _supported_extension_aliases = ["router", "quotas", "binding",
+ "security-group", "extraroute",
+ "agent", "agent_scheduler",
+ ]
+
+ @property
+ def supported_extension_aliases(self):
+ if not hasattr(self, '_aliases'):
+ aliases = self._supported_extension_aliases[:]
+ sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ self._aliases = aliases
+ return self._aliases
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
__native_pagination_support = True
__native_sorting_support = True
- supported_extension_aliases = ["provider", "router",
- "binding", "quotas", "security-group",
- "agent", "extraroute", "agent_scheduler"]
+ _supported_extension_aliases = ["provider", "router",
+ "binding", "quotas", "security-group",
+ "agent", "extraroute", "agent_scheduler"]
+
+ @property
+ def supported_extension_aliases(self):
+ if not hasattr(self, '_aliases'):
+ aliases = self._supported_extension_aliases[:]
+ sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ self._aliases = aliases
+ return self._aliases
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
extraroute_db.ExtraRoute_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
- supported_extension_aliases = ["router", "extraroute", "security-group"]
+ _supported_extension_aliases = ["router", "extraroute", "security-group"]
+
+ @property
+ def supported_extension_aliases(self):
+ if not hasattr(self, '_aliases'):
+ aliases = self._supported_extension_aliases[:]
+ sg_rpc.disable_security_group_extension_if_noop_driver(aliases)
+ self._aliases = aliases
+ return self._aliases
def __init__(self, configfile=None):
db.configure_db()
# under the License.
import mock
-from mock import call
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
+ test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_IPTABLES_DRIVER)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
from quantum.extensions import portbindings
from quantum.tests.unit import _test_extension_portbindings as test_bindings
from quantum.tests.unit import test_db_plugin as test_plugin
+from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
+
PLUGIN_NAME = ('quantum.plugins.linuxbridge.'
'lb_quantum_plugin.LinuxBridgePluginV2')
pass
-class TestLinuxBridgePortsV2(test_plugin.TestPortsV2,
- LinuxBridgePluginV2TestCase,
- test_bindings.PortBindingsTestCase):
+class TestLinuxBridgeNetworksV2(test_plugin.TestNetworksV2,
+ LinuxBridgePluginV2TestCase):
+ pass
- VIF_TYPE = portbindings.VIF_TYPE_BRIDGE
- HAS_PORT_FILTER = True
+
+class TestLinuxBridgePortsV2(test_plugin.TestPortsV2,
+ LinuxBridgePluginV2TestCase):
def test_update_port_status_build(self):
with self.port() as port:
self.assertEqual(self.port_create_status, 'DOWN')
-class TestLinuxBridgeNetworksV2(test_plugin.TestNetworksV2,
- LinuxBridgePluginV2TestCase):
- pass
+class TestLinuxBridgePortBinding(LinuxBridgePluginV2TestCase,
+ test_bindings.PortBindingsTestCase):
+ VIF_TYPE = portbindings.VIF_TYPE_BRIDGE
+ HAS_PORT_FILTER = True
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_IPTABLES_DRIVER
+
+ def setUp(self):
+ test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ super(TestLinuxBridgePortBinding, self).setUp()
+
+
+class TestLinuxBridgePortBindingNoSG(TestLinuxBridgePortBinding):
+ HAS_PORT_FILTER = False
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
from quantum.extensions import portbindings
from quantum.tests.unit import _test_extension_portbindings as test_bindings
from quantum.tests.unit import test_db_plugin as test_plugin
+from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = 'quantum.plugins.nec.nec_plugin.NECPluginV2'
pass
-class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase,
- test_bindings.PortBindingsTestCase):
+class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
pass
+
+
+class TestNecPortBinding(test_bindings.PortBindingsTestCase,
+ NecPluginV2TestCase):
+ VIF_TYPE = portbindings.VIF_TYPE_OVS
+ HAS_PORT_FILTER = True
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
+
+ def setUp(self):
+ test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ super(TestNecPortBinding, self).setUp()
+
+
+class TestNecPortBindingNoSG(TestNecPortBinding):
+ HAS_PORT_FILTER = False
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
+ test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_HYBRID_DRIVER)
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
from quantum.extensions import portbindings
from quantum.tests.unit import _test_extension_portbindings as test_bindings
from quantum.tests.unit import test_db_plugin as test_plugin
+from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
class OpenvswitchPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
class TestOpenvswitchPortsV2(test_plugin.TestPortsV2,
- OpenvswitchPluginV2TestCase,
- test_bindings.PortBindingsTestCase):
-
- VIF_TYPE = portbindings.VIF_TYPE_OVS
- HAS_PORT_FILTER = True
+ OpenvswitchPluginV2TestCase):
def test_update_port_status_build(self):
with self.port() as port:
class TestOpenvswitchNetworksV2(test_plugin.TestNetworksV2,
OpenvswitchPluginV2TestCase):
pass
+
+
+class TestOpenvswitchPortBinding(OpenvswitchPluginV2TestCase,
+ test_bindings.PortBindingsTestCase):
+ VIF_TYPE = portbindings.VIF_TYPE_OVS
+ HAS_PORT_FILTER = True
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
+
+ def setUp(self, firewall_driver=None):
+ test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
+ super(TestOpenvswitchPortBinding, self).setUp()
+
+
+class TestOpenvswitchPortBindingNoSG(TestOpenvswitchPortBinding):
+ HAS_PORT_FILTER = False
+ FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
+ test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_HYBRID_DRIVER)
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
+ test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_HYBRID_DRIVER)
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
""" % IPTABLES_ARG
FIREWALL_BASE_PACKAGE = 'quantum.agent.linux.iptables_firewall.'
+FIREWALL_IPTABLES_DRIVER = FIREWALL_BASE_PACKAGE + 'IptablesFirewallDriver'
+FIREWALL_HYBRID_DRIVER = (FIREWALL_BASE_PACKAGE +
+ 'OVSHybridIptablesFirewallDriver')
+FIREWALL_NOOP_DRIVER = 'quantum.agent.firewall.NoopFirewallDriver'
+
+
+def set_firewall_driver(firewall_driver):
+ cfg.CONF.set_override('firewall_driver', firewall_driver,
+ group='SECURITYGROUP')
class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
- FIREWALL_DRIVER = FIREWALL_BASE_PACKAGE + 'IptablesFirewallDriver'
+ FIREWALL_DRIVER = FIREWALL_IPTABLES_DRIVER
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
class TestSecurityGroupAgentWithOVSIptables(
TestSecurityGroupAgentWithIptables):
- FIREWALL_DRIVER = FIREWALL_BASE_PACKAGE + 'OVSHybridIptablesFirewallDriver'
+ FIREWALL_DRIVER = FIREWALL_HYBRID_DRIVER
def _regex(self, value):
#Note(nati): tap is prefixed on the device
return super(
TestSecurityGroupAgentWithOVSIptables,
self)._regex(value)
+
+
+class TestSecurityGroupExtensionControl(base.BaseTestCase):
+ def test_firewall_enabled_noop_driver(self):
+ set_firewall_driver(FIREWALL_NOOP_DRIVER)
+ self.assertFalse(sg_rpc.is_firewall_enabled())
+
+ def test_firewall_enabled_iptables_driver(self):
+ set_firewall_driver(FIREWALL_IPTABLES_DRIVER)
+ self.assertTrue(sg_rpc.is_firewall_enabled())
+
+ def test_disable_security_group_extension_noop_driver(self):
+ set_firewall_driver(FIREWALL_NOOP_DRIVER)
+ exp_aliases = ['dummy1', 'dummy2']
+ ext_aliases = ['dummy1', 'security-group', 'dummy2']
+ sg_rpc.disable_security_group_extension_if_noop_driver(ext_aliases)
+ self.assertEqual(ext_aliases, exp_aliases)
+
+ def test_disable_security_group_extension_iptables_driver(self):
+ set_firewall_driver(FIREWALL_IPTABLES_DRIVER)
+ exp_aliases = ['dummy1', 'security-group', 'dummy2']
+ ext_aliases = ['dummy1', 'security-group', 'dummy2']
+ sg_rpc.disable_security_group_extension_if_noop_driver(ext_aliases)
+ self.assertEqual(ext_aliases, exp_aliases)