support in agent implementations.
"""
- def init_firewall(self):
+ def init_firewall(self, defer_refresh_firewall=False):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
self.firewall = importutils.import_object(firewall_driver)
+ # The following flag will be set to true if port filter must not be
+ # applied as soon as a rule or membership notification is received
+ self.defer_refresh_firewall = defer_refresh_firewall
+ # Stores devices for which firewall should be refreshed when
+ # deferred refresh is enabled.
+ self.devices_to_refilter = set()
+ # Flag raised when a global refresh is needed
+ self.global_refresh_firewall = False
def prepare_devices_filter(self, device_ids):
if not device_ids:
sec_grp_set = set(security_groups)
for device in self.firewall.ports.values():
if sec_grp_set & set(device.get(attribute, [])):
- devices.append(device)
-
- if devices:
+ devices.append(device['device'])
+ if self.defer_refresh_firewall:
+ LOG.debug(_("Adding %s devices to the list of devices "
+ "for which firewall needs to be refreshed"),
+ devices)
+ self.devices_to_refilter |= set(devices)
+ elif devices:
self.refresh_firewall(devices)
def security_groups_provider_updated(self):
LOG.info(_("Provider rule updated"))
- self.refresh_firewall()
+ if self.defer_refresh_firewall:
+ # NOTE(salv-orlando): A 'global refresh' might not be
+ # necessary if the subnet for which the provider rules
+ # were updated is known
+ self.global_refresh_firewall = True
+ else:
+ self.refresh_firewall()
def remove_devices_filter(self, device_ids):
if not device_ids:
continue
self.firewall.remove_port_filter(device)
- def refresh_firewall(self, devices=None):
+ def refresh_firewall(self, device_ids=None):
LOG.info(_("Refresh firewall rules"))
-
- if devices:
- device_ids = [d['device'] for d in devices]
- else:
- device_ids = self.firewall.ports.keys()
if not device_ids:
- LOG.info(_("No ports here to refresh firewall"))
- return
+ device_ids = self.firewall.ports.keys()
+ if not device_ids:
+ LOG.info(_("No ports here to refresh firewall"))
+ return
devices = self.plugin_rpc.security_group_rules_for_devices(
self.context, device_ids)
with self.firewall.defer_apply():
LOG.debug(_("Update port filter for %s"), device['device'])
self.firewall.update_port_filter(device)
+ def firewall_refresh_needed(self):
+ return self.global_refresh_firewall or self.devices_to_refilter
+
+ def setup_port_filters(self, new_devices, updated_devices):
+ """Configure port filters for devices.
+
+ This routine applies filters for new devices and refreshes firewall
+ rules when devices have been updated, or when there are changes in
+ security group membership or rules.
+
+ :param new_devices: set containing identifiers for new devices
+ :param updated_devices: set containining identifiers for
+ updated devices
+ """
+ if new_devices:
+ LOG.debug(_("Preparing device filters for %d new devices"),
+ len(new_devices))
+ self.prepare_devices_filter(new_devices)
+ # These data structures are cleared here in order to avoid
+ # losing updates occurring during firewall refresh
+ devices_to_refilter = self.devices_to_refilter
+ global_refresh_firewall = self.global_refresh_firewall
+ self.devices_to_refilter = set()
+ self.global_refresh_firewall = False
+ # TODO(salv-orlando): Avoid if possible ever performing the global
+ # refresh providing a precise list of devices for which firewall
+ # should be refreshed
+ if global_refresh_firewall:
+ LOG.debug(_("Refreshing firewall for all filtered devices"))
+ self.refresh_firewall()
+ else:
+ # If a device is both in new and updated devices
+ # avoid reprocessing it
+ updated_devices = ((updated_devices | devices_to_refilter) -
+ new_devices)
+ if updated_devices:
+ LOG.debug(_("Refreshing firewall for %d devices"),
+ len(updated_devices))
+ self.refresh_firewall(updated_devices)
+
class SecurityGroupAgentRpcApiMixin(object):
# License for the specific language governing permissions and limitations
# under the License.
+from contextlib import contextmanager
from contextlib import nested
import mock
class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
- def setUp(self):
+ def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentRpcTestCase, self).setUp()
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
self.addCleanup(mock.patch.stopall)
mock.patch('neutron.agent.linux.iptables_manager').start()
self.agent.root_helper = 'sudo'
- self.agent.init_firewall()
+ self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
- [call.refresh_firewall([self.fake_device])])
+ [call.refresh_firewall([self.fake_device['device']])])
def test_security_groups_rule_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
self.agent.prepare_devices_filter(['fake_port_id'])
self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
self.agent.refresh_firewall.assert_has_calls(
- [call.refresh_firewall([self.fake_device])])
+ [call.refresh_firewall([self.fake_device['device']])])
def test_security_groups_member_not_updated(self):
self.agent.refresh_firewall = mock.Mock()
self.firewall.assert_has_calls([])
+class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
+ SecurityGroupAgentRpcTestCase):
+
+ def setUp(self):
+ super(SecurityGroupAgentRpcWithDeferredRefreshTestCase, self).setUp(
+ defer_refresh_firewall=True)
+
+ @contextmanager
+ def add_fake_device(self, device, sec_groups, source_sec_groups=None):
+ fake_device = {'device': device,
+ 'security_groups': sec_groups,
+ 'security_group_source_groups': source_sec_groups or [],
+ 'security_group_rules': [{'security_group_id':
+ 'fake_sgid1',
+ 'remote_group_id':
+ 'fake_sgid2'}]}
+ self.firewall.ports[device] = fake_device
+ yield
+ del self.firewall.ports[device]
+
+ def test_security_groups_rule_updated(self):
+ self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_rule_updated_same_port(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgidX']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_rule_updated(['fake_sgid1'])
+ self.agent.security_groups_rule_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_rule_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid2']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_rule_updated(['fake_sgid1',
+ 'fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_rule_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid2']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_rule_updated(['fake_sgid1'])
+ self.agent.security_groups_rule_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_member_updated(self):
+ self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_member_updated_same_port(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid1', 'fake_sgid1B'],
+ source_sec_groups=['fake_sgidX']):
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.security_groups_member_updated(['fake_sgid1',
+ 'fake_sgid3'])
+ self.agent.security_groups_member_updated(['fake_sgid2',
+ 'fake_sgid3'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertNotIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_member_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid1', 'fake_sgid1B'],
+ source_sec_groups=['fake_sgid2']):
+ self.agent.security_groups_member_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_multiple_security_groups_member_updated_multiple_ports(self):
+ with self.add_fake_device(device='fake_device_2',
+ sec_groups=['fake_sgid1', 'fake_sgid1B'],
+ source_sec_groups=['fake_sgid1B']):
+ self.agent.security_groups_member_updated(['fake_sgid1B'])
+ self.agent.security_groups_member_updated(['fake_sgid2'])
+ self.assertIn('fake_device', self.agent.devices_to_refilter)
+ self.assertIn('fake_device_2', self.agent.devices_to_refilter)
+
+ def test_security_groups_provider_updated(self):
+ self.agent.security_groups_provider_updated()
+ self.assertTrue(self.agent.global_refresh_firewall)
+
+ def test_setup_port_filters_new_ports_only(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(['fake_new_device']), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.assertFalse(self.agent.refresh_firewall.called)
+
+ def test_setup_port_filters_updated_ports_only(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(), set(['fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_updated_device']))
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filter_new_and_updated_ports(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(['fake_new_device']),
+ set(['fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_updated_device']))
+
+ def test_setup_port_filters_sg_updates_only(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device']))
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filters_sg_updates_and_new_ports(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(['fake_new_device']), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device']))
+
+ def test_setup_port_filters_sg_updates_and_updated_ports(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(
+ set(), set(['fake_device', 'fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device', 'fake_device_2', 'fake_updated_device']))
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filters_all_updates(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2'])
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(
+ set(['fake_new_device']),
+ set(['fake_device', 'fake_updated_device']))
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.prepare_devices_filter.assert_called_once_with(
+ set(['fake_new_device']))
+ self.agent.refresh_firewall.assert_called_once_with(
+ set(['fake_device', 'fake_device_2', 'fake_updated_device']))
+
+ def test_setup_port_filters_no_update(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = False
+ self.agent.setup_port_filters(set(), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.assertFalse(self.agent.refresh_firewall.called)
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+ def test_setup_port_filters_with_global_refresh(self):
+ self.agent.prepare_devices_filter = mock.Mock()
+ self.agent.refresh_firewall = mock.Mock()
+ self.agent.devices_to_refilter = set()
+ self.agent.global_refresh_firewall = True
+ self.agent.setup_port_filters(set(), set())
+ self.assertFalse(self.agent.devices_to_refilter)
+ self.assertFalse(self.agent.global_refresh_firewall)
+ self.agent.refresh_firewall.assert_called_once_with()
+ self.assertFalse(self.agent.prepare_devices_filter.called)
+
+
class FakeSGRpcApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
pass
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
- def setUp(self):
+ def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentWithIptables, self).setUp()
cfg.CONF.set_override(
'firewall_driver',
self.root_helper = 'sudo'
self.agent.root_helper = 'sudo'
- self.agent.init_firewall()
+ self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.iptables = self.agent.firewall.iptables
self.iptables_execute = mock.patch.object(self.iptables,