# under the License.
#
+import functools
+
from oslo.config import cfg
from oslo import messaging
+from neutron.agent import firewall
from neutron.common import topics
-from neutron.openstack.common.gettextutils import _LW
+from neutron.openstack.common.gettextutils import _LI, _LW
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
def disable_security_group_extension_by_config(aliases):
if not is_firewall_enabled():
- LOG.info(_('Disabled security-group extension.'))
+ LOG.info(_LI('Disabled security-group extension.'))
_disable_extension('security-group', aliases)
- LOG.info(_('Disabled allowed-address-pairs extension.'))
+ LOG.info(_LI('Disabled allowed-address-pairs extension.'))
_disable_extension('allowed-address-pairs', aliases)
return False
return True
+ def skip_if_noopfirewall_or_firewall_disabled(func):
+ @functools.wraps(func)
+ def decorated_function(self, *args, **kwargs):
+ if (isinstance(self.firewall, firewall.NoopFirewallDriver) or
+ not is_firewall_enabled()):
+ LOG.info(_LI("Skipping method %s as firewall is disabled "
+ "or configured as NoopFirewallDriver."),
+ func.__name__)
+ else:
+ return func(self, *args, **kwargs)
+ return decorated_function
+
+ @skip_if_noopfirewall_or_firewall_disabled
def prepare_devices_filter(self, device_ids):
if not device_ids:
return
- LOG.info(_("Preparing filters for devices %s"), device_ids)
+ LOG.info(_LI("Preparing filters for devices %s"), device_ids)
if self.use_enhanced_rpc:
devices_info = self.plugin_rpc.security_group_info_for_devices(
self.context, list(device_ids))
remote_sg_id, member_ips)
def security_groups_rule_updated(self, security_groups):
- LOG.info(_("Security group "
- "rule updated %r"), security_groups)
+ LOG.info(_LI("Security group "
+ "rule updated %r"), security_groups)
self._security_group_updated(
security_groups,
'security_groups')
def security_groups_member_updated(self, security_groups):
- LOG.info(_("Security group "
- "member updated %r"), security_groups)
+ LOG.info(_LI("Security group "
+ "member updated %r"), security_groups)
self._security_group_updated(
security_groups,
'security_group_source_groups')
self.refresh_firewall(devices)
def security_groups_provider_updated(self):
- LOG.info(_("Provider rule updated"))
+ LOG.info(_LI("Provider rule updated"))
if self.defer_refresh_firewall:
# NOTE(salv-orlando): A 'global refresh' might not be
# necessary if the subnet for which the provider rules
def remove_devices_filter(self, device_ids):
if not device_ids:
return
- LOG.info(_("Remove device filter for %r"), device_ids)
+ LOG.info(_LI("Remove device filter for %r"), device_ids)
with self.firewall.defer_apply():
for device_id in device_ids:
device = self.firewall.ports.get(device_id)
continue
self.firewall.remove_port_filter(device)
+ @skip_if_noopfirewall_or_firewall_disabled
def refresh_firewall(self, device_ids=None):
- LOG.info(_("Refresh firewall rules"))
+ LOG.info(_LI("Refresh firewall rules"))
if not device_ids:
device_ids = self.firewall.ports.keys()
if not device_ids:
- LOG.info(_("No ports here to refresh firewall"))
+ LOG.info(_LI("No ports here to refresh firewall"))
return
if self.use_enhanced_rpc:
devices_info = self.plugin_rpc.security_group_info_for_devices(
self.agent.root_helper = 'sudo'
self.agent.plugin_rpc = mock.Mock()
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
+ self.default_firewall = self.agent.firewall
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
self.fake_device),
])
+ def test_prepare_devices_filter_with_noopfirewall(self):
+ self.agent.firewall = self.default_firewall
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.prepare_devices_filter(['fake_device'])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+
+ def test_prepare_devices_filter_with_firewall_disabled(self):
+ cfg.CONF.set_override('enable_security_group', False, 'SECURITYGROUP')
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.prepare_devices_filter(['fake_device'])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+
def test_security_groups_rule_updated(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.refresh_firewall([])
self.assertFalse(self.firewall.called)
+ def test_refresh_firewall_with_firewall_disabled(self):
+ cfg.CONF.set_override('enable_security_group', False, 'SECURITYGROUP')
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.firewall.defer_apply = mock.Mock()
+ self.agent.refresh_firewall([self.fake_device])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+ self.assertFalse(self.agent.firewall.defer_apply.called)
+
+ def test_refresh_firewall_with_noopfirewall(self):
+ self.agent.firewall = self.default_firewall
+ self.agent.plugin_rpc.security_group_info_for_devices = mock.Mock()
+ self.agent.plugin_rpc.security_group_rules_for_devices = mock.Mock()
+ self.agent.firewall.defer_apply = mock.Mock()
+ self.agent.refresh_firewall([self.fake_device])
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_info_for_devices.called)
+ self.assertFalse(self.agent.plugin_rpc.
+ security_group_rules_for_devices.called)
+ self.assertFalse(self.agent.firewall.defer_apply.called)
+
class SecurityGroupAgentEnhancedRpcTestCase(
BaseSecurityGroupAgentRpcTestCase):