]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Refactor security group rpc call
authorshihanzhang <shihanzhang@huawei.com>
Mon, 4 Aug 2014 08:44:31 +0000 (16:44 +0800)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Tue, 2 Sep 2014 17:43:34 +0000 (19:43 +0200)
Refactor rpc call of 'security_group_rules_for_devices' into
'security_group_info_for_devices' to reduce the response
message size and server side processing time. Includes
agent-side auto detection of rpc version, and fall back
to the old one.

Change-Id: If19be8579ca734a899cdd673c919eee8165aaa0e
Implements: blueprint security-group-rules-for-devices-rpc-call-refactor
DocImpact
Co-Authored-By: Miguel Angel Ajo <mangelajo@redhat.com>
neutron/agent/linux/iptables_firewall.py
neutron/agent/securitygroups_rpc.py
neutron/api/rpc/handlers/securitygroups_rpc.py
neutron/db/securitygroups_rpc_base.py
neutron/tests/unit/test_security_groups_rpc.py

index 20f56cd610580130315bb9a131018a1c98be1bd0..ec37306a5593f97e17f0865e02337ceb3d4d4c7f 100644 (file)
@@ -31,6 +31,8 @@ SPOOF_FILTER = 'spoof-filter'
 CHAIN_NAME_PREFIX = {INGRESS_DIRECTION: 'i',
                      EGRESS_DIRECTION: 'o',
                      SPOOF_FILTER: 's'}
+DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
+                       'egress': 'dest_ip_prefix'}
 LINUX_DEV_LEN = 14
 
 
@@ -48,11 +50,25 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
         self._add_fallback_chain_v4v6()
         self._defer_apply = False
         self._pre_defer_filtered_ports = None
+        # List of security group rules for ports residing on this host
+        self.sg_rules = {}
+        self.pre_sg_rules = None
+        # List of security group member ips for ports residing on this host
+        self.sg_members = {}
+        self.pre_sg_members = None
 
     @property
     def ports(self):
         return self.filtered_ports
 
+    def update_security_group_rules(self, sg_id, sg_rules):
+        LOG.debug("Update rules of security group (%s)", sg_id)
+        self.sg_rules[sg_id] = sg_rules
+
+    def update_security_group_members(self, sg_id, sg_members):
+        LOG.debug("Update members of security group (%s)", sg_id)
+        self.sg_members[sg_id] = sg_members
+
     def prepare_port_filter(self, port):
         LOG.debug(_("Preparing device (%s) filter"), port['device'])
         self._remove_chains()
@@ -250,10 +266,33 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
                              icmp6_type]
         return icmpv6_rules
 
+    def _select_sg_rules_for_port(self, port, direction):
+        sg_ids = port.get('security_groups', [])
+        port_rules = []
+        fixed_ips = port.get('fixed_ips', [])
+        for sg_id in sg_ids:
+            for rule in self.sg_rules.get(sg_id, []):
+                if rule['direction'] == direction:
+                    remote_group_id = rule.get('remote_group_id')
+                    if not remote_group_id:
+                        port_rules.append(rule)
+                        continue
+                    ethertype = rule['ethertype']
+                    for ip in self.sg_members[remote_group_id][ethertype]:
+                        if ip in fixed_ips:
+                            continue
+                        ip_rule = rule.copy()
+                        direction_ip_prefix = DIRECTION_IP_PREFIX[direction]
+                        ip_rule[direction_ip_prefix] = str(
+                            netaddr.IPNetwork(ip).cidr)
+                        port_rules.append(ip_rule)
+        return port_rules
+
     def _add_rule_by_security_group(self, port, direction):
         chain_name = self._port_chain_name(port, direction)
         # select rules for current direction
         security_group_rules = self._select_sgr_by_direction(port, direction)
+        security_group_rules += self._select_sg_rules_for_port(port, direction)
         # split groups by ip version
         # for ipv4, iptables command is used
         # for ipv6, iptables6 command is used
@@ -360,15 +399,43 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
         if not self._defer_apply:
             self.iptables.defer_apply_on()
             self._pre_defer_filtered_ports = dict(self.filtered_ports)
+            self.pre_sg_members = dict(self.sg_members)
+            self.pre_sg_rules = dict(self.sg_rules)
             self._defer_apply = True
 
+    def _remove_unused_security_group_info(self):
+        need_removed_ipset_chains = set()
+        need_removed_security_groups = set()
+        remote_group_ids = set()
+        cur_group_ids = set()
+        for port in self.filtered_ports.values():
+            source_groups = port.get('security_group_source_groups', [])
+            remote_group_ids.update(source_groups)
+            groups = port.get('security_groups', [])
+            cur_group_ids.update(groups)
+
+        need_removed_ipset_chains.update(
+            [x for x in self.pre_sg_members if x not in remote_group_ids])
+        need_removed_security_groups.update(
+            [x for x in self.pre_sg_rules if x not in cur_group_ids])
+        # Remove unused remote security group member ips
+        for remove_chain_id in need_removed_ipset_chains:
+            if remove_chain_id in self.sg_members:
+                self.sg_members.pop(remove_chain_id, None)
+
+        # Remove unused security group rules
+        for remove_group_id in need_removed_security_groups:
+            if remove_group_id in self.sg_rules:
+                self.sg_rules.pop(remove_group_id, None)
+
     def filter_defer_apply_off(self):
         if self._defer_apply:
             self._defer_apply = False
             self._remove_chains_apply(self._pre_defer_filtered_ports)
-            self._pre_defer_filtered_ports = None
             self._setup_chains_apply(self.filtered_ports)
             self.iptables.defer_apply_off()
+            self._remove_unused_security_group_info()
+            self._pre_defer_filtered_ports = None
 
 
 class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
index 49f0b7dabf65f0c6911a650b9cd8f338d28befda..478e0f9e05ed0b3d373b3fae2bf6658524a41866 100644 (file)
 #
 
 from oslo.config import cfg
+from oslo import messaging
 
 from neutron.common import topics
+from neutron.openstack.common.gettextutils import _LW
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 
 LOG = logging.getLogger(__name__)
+# history
+#   1.1 Support Security Group RPC
 SG_RPC_VERSION = "1.1"
 
 security_group_opts = [
@@ -74,13 +78,22 @@ def disable_security_group_extension_by_config(aliases):
 
 class SecurityGroupServerRpcApiMixin(object):
     """A mix-in that enable SecurityGroup support in plugin rpc."""
+
     def security_group_rules_for_devices(self, context, devices):
         LOG.debug(_("Get security group rules "
                     "for devices via rpc %r"), devices)
         return self.call(context,
                          self.make_msg('security_group_rules_for_devices',
                                        devices=devices),
-                         version=SG_RPC_VERSION)
+                         version='1.1')
+
+    def security_group_info_for_devices(self, context, devices):
+        LOG.debug("Get security group information for devices via rpc %r",
+                  devices)
+        return self.call(context,
+                         self.make_msg('security_group_info_for_devices',
+                                       devices=devices),
+                         version='1.2')
 
 
 class SecurityGroupAgentRpcCallbackMixin(object):
@@ -149,16 +162,58 @@ class SecurityGroupAgentRpcMixin(object):
         self.devices_to_refilter = set()
         # Flag raised when a global refresh is needed
         self.global_refresh_firewall = False
+        self._use_enhanced_rpc = None
+
+    @property
+    def use_enhanced_rpc(self):
+        if self._use_enhanced_rpc is None:
+            self._use_enhanced_rpc = (
+                self._check_enhanced_rpc_is_supported_by_server())
+        return self._use_enhanced_rpc
+
+    def _check_enhanced_rpc_is_supported_by_server(self):
+        try:
+            self.plugin_rpc.security_group_info_for_devices(
+                self.context, devices=[])
+        except messaging.UnsupportedVersion:
+            LOG.warning(_LW('security_group_info_for_devices rpc call not '
+                            'supported by the server, falling back to old '
+                            'security_group_rules_for_devices which scales '
+                            'worse.'))
+            return False
+        return True
 
     def prepare_devices_filter(self, device_ids):
         if not device_ids:
             return
         LOG.info(_("Preparing filters for devices %s"), device_ids)
-        devices = self.plugin_rpc.security_group_rules_for_devices(
-            self.context, list(device_ids))
+        if self.use_enhanced_rpc:
+            devices_info = self.plugin_rpc.security_group_info_for_devices(
+                self.context, list(device_ids))
+            devices = devices_info['devices']
+            security_groups = devices_info['security_groups']
+            security_group_member_ips = devices_info['sg_member_ips']
+        else:
+            devices = self.plugin_rpc.security_group_rules_for_devices(
+                self.context, list(device_ids))
+
         with self.firewall.defer_apply():
             for device in devices.values():
                 self.firewall.prepare_port_filter(device)
+            if self.use_enhanced_rpc:
+                LOG.debug("Update security group information for ports %s",
+                          devices.keys())
+                self._update_security_group_info(
+                    security_groups, security_group_member_ips)
+
+    def _update_security_group_info(self, security_groups,
+                                    security_group_member_ips):
+        LOG.debug("Update security group information")
+        for sg_id, sg_rules in security_groups.items():
+            self.firewall.update_security_group_rules(sg_id, sg_rules)
+        for remote_sg_id, member_ips in security_group_member_ips.items():
+            self.firewall.update_security_group_members(
+                remote_sg_id, member_ips)
 
     def security_groups_rule_updated(self, security_groups):
         LOG.info(_("Security group "
@@ -217,12 +272,25 @@ class SecurityGroupAgentRpcMixin(object):
             if not device_ids:
                 LOG.info(_("No ports here to refresh firewall"))
                 return
-        devices = self.plugin_rpc.security_group_rules_for_devices(
-            self.context, device_ids)
+        if self.use_enhanced_rpc:
+            devices_info = self.plugin_rpc.security_group_info_for_devices(
+                self.context, device_ids)
+            devices = devices_info['devices']
+            security_groups = devices_info['security_groups']
+            security_group_member_ips = devices_info['sg_member_ips']
+        else:
+            devices = self.plugin_rpc.security_group_rules_for_devices(
+                self.context, device_ids)
+
         with self.firewall.defer_apply():
             for device in devices.values():
                 LOG.debug(_("Update port filter for %s"), device['device'])
                 self.firewall.update_port_filter(device)
+            if self.use_enhanced_rpc:
+                LOG.debug("Update security group information for ports %s",
+                          devices.keys())
+                self._update_security_group_info(
+                    security_groups, security_group_member_ips)
 
     def firewall_refresh_needed(self):
         return self.global_refresh_firewall or self.devices_to_refilter
index bbba92a16e3d46559163f768e8045438ff4c7725..857e99d8c496966972a308570d045d1999e07186 100644 (file)
@@ -28,15 +28,27 @@ class SecurityGroupServerRpcCallback(n_rpc.RpcCallback):
 
     # API version history:
     #   1.1 - Initial version
+    #   1.2 - security_group_info_for_devices introduced as an optimization
 
     # NOTE: RPC_API_VERSION must not be overridden in subclasses
     # to keep RPC API version consistent across plugins.
-    RPC_API_VERSION = '1.1'
+    RPC_API_VERSION = '1.2'
 
     @property
     def plugin(self):
         return manager.NeutronManager.get_plugin()
 
+    def _get_devices_info(self, devices):
+        devices_info = {}
+        for device in devices:
+            port = self.plugin.get_port_from_device(device)
+            if not port:
+                continue
+            if port['device_owner'].startswith('network:'):
+                continue
+            devices_info[port['id']] = port
+        return devices_info
+
     def security_group_rules_for_devices(self, context, **kwargs):
         """Callback method to return security group rules for each port.
 
@@ -46,14 +58,21 @@ class SecurityGroupServerRpcCallback(n_rpc.RpcCallback):
         :params devices: list of devices
         :returns: port correspond to the devices with security group rules
         """
-        devices = kwargs.get('devices')
-
-        ports = {}
-        for device in devices:
-            port = self.plugin.get_port_from_device(device)
-            if not port:
-                continue
-            if port['device_owner'].startswith('network:'):
-                continue
-            ports[port['id']] = port
+        devices_info = kwargs.get('devices')
+        ports = self._get_devices_info(devices_info)
         return self.plugin.security_group_rules_for_ports(context, ports)
+
+    def security_group_info_for_devices(self, context, **kwargs):
+        """Return security group information for requested devices.
+
+        :params devices: list of devices
+        :returns:
+        sg_info{
+          'security_groups': {sg_id: [rule1, rule2]}
+          'sg_member_ips': {sg_id: {'IPv4': [], 'IPv6': []}}
+          'devices': {device_id: {device_info}}
+        }
+        """
+        devices_info = kwargs.get('devices')
+        ports = self._get_devices_info(devices_info)
+        return self.plugin.security_group_info_for_ports(context, ports)
index 81d3457f67cd8a7b97ab10ced2525b01b31e4990..6555850dde312389bd65d8f93e0352e8c14bfab2 100644 (file)
@@ -149,6 +149,69 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
             self.notifier.security_groups_member_updated(
                 context, port.get(ext_sg.SECURITYGROUPS))
 
+    def security_group_info_for_ports(self, context, ports):
+        sg_info = {'devices': ports,
+                   'security_groups': {},
+                   'sg_member_ips': {}}
+        rules_in_db = self._select_rules_for_ports(context, ports)
+        remote_security_group_info = {}
+        for (binding, rule_in_db) in rules_in_db:
+            port_id = binding['port_id']
+            remote_gid = rule_in_db.get('remote_group_id')
+            security_group_id = rule_in_db.get('security_group_id')
+            ethertype = rule_in_db['ethertype']
+            if ('security_group_source_groups'
+                not in sg_info['devices'][port_id]):
+                sg_info['devices'][port_id][
+                    'security_group_source_groups'] = []
+
+            if remote_gid:
+                if (remote_gid
+                    not in sg_info['devices'][port_id][
+                        'security_group_source_groups']):
+                    sg_info['devices'][port_id][
+                        'security_group_source_groups'].append(remote_gid)
+                if remote_gid not in remote_security_group_info:
+                    remote_security_group_info[remote_gid] = {}
+                if ethertype not in remote_security_group_info[remote_gid]:
+                    remote_security_group_info[remote_gid][ethertype] = []
+
+            direction = rule_in_db['direction']
+            rule_dict = {
+                'direction': direction,
+                'ethertype': ethertype}
+
+            for key in ('protocol', 'port_range_min', 'port_range_max',
+                        'remote_ip_prefix', 'remote_group_id'):
+                if rule_in_db.get(key):
+                    if key == 'remote_ip_prefix':
+                        direction_ip_prefix = DIRECTION_IP_PREFIX[direction]
+                        rule_dict[direction_ip_prefix] = rule_in_db[key]
+                        continue
+                    rule_dict[key] = rule_in_db[key]
+            if security_group_id not in sg_info['security_groups']:
+                sg_info['security_groups'][security_group_id] = []
+            if rule_dict not in sg_info['security_groups'][security_group_id]:
+                sg_info['security_groups'][security_group_id].append(
+                    rule_dict)
+
+        sg_info['sg_member_ips'] = remote_security_group_info
+        # the provider rules do not belong to any security group, so these
+        # rules still reside in sg_info['devices'] [port_id]
+        self._apply_provider_rule(context, sg_info['devices'])
+
+        return self._get_security_group_member_ips(context, sg_info)
+
+    def _get_security_group_member_ips(self, context, sg_info):
+        ips = self._select_ips_for_remote_group(
+            context, sg_info['sg_member_ips'].keys())
+        for sg_id, member_ips in ips.items():
+            for ip in member_ips:
+                ethertype = 'IPv%d' % netaddr.IPAddress(ip).version
+                if ip not in sg_info['sg_member_ips'][sg_id][ethertype]:
+                    sg_info['sg_member_ips'][sg_id][ethertype].append(ip)
+        return sg_info
+
     def _select_rules_for_ports(self, context, ports):
         if not ports:
             return []
index 695a74487107309d14f57dda20f890b5db8f0051..c841bf322e828ff174c9d725d3d4fa740153904c 100644 (file)
@@ -17,6 +17,7 @@ import contextlib
 
 import mock
 from oslo.config import cfg
+from oslo import messaging
 from testtools import matchers
 import webob.exc
 
@@ -396,6 +397,64 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
                 self._delete('ports', port_id1)
                 self._delete('ports', port_id2)
 
+    def test_security_group_info_for_devices_ipv4_source_group(self):
+
+        with self.network() as n:
+            with contextlib.nested(self.subnet(n),
+                                   self.security_group(),
+                                   self.security_group()) as (subnet_v4,
+                                                              sg1,
+                                                              sg2):
+                sg1_id = sg1['security_group']['id']
+                sg2_id = sg2['security_group']['id']
+                rule1 = self._build_security_group_rule(
+                    sg1_id,
+                    'ingress', const.PROTO_NAME_TCP, '24',
+                    '25', remote_group_id=sg2['security_group']['id'])
+                rules = {
+                    'security_group_rules': [rule1['security_group_rule']]}
+                res = self._create_security_group_rule(self.fmt, rules)
+                self.deserialize(self.fmt, res)
+                self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
+
+                res1 = self._create_port(
+                    self.fmt, n['network']['id'],
+                    security_groups=[sg1_id])
+                ports_rest1 = self.deserialize(self.fmt, res1)
+                port_id1 = ports_rest1['port']['id']
+                self.rpc.devices = {port_id1: ports_rest1['port']}
+                devices = [port_id1, 'no_exist_device']
+
+                res2 = self._create_port(
+                    self.fmt, n['network']['id'],
+                    security_groups=[sg2_id])
+                ports_rest2 = self.deserialize(self.fmt, res2)
+                port_id2 = ports_rest2['port']['id']
+                ctx = context.get_admin_context()
+                ports_rpc = self.rpc.security_group_info_for_devices(
+                    ctx, devices=devices)
+                expected = {
+                    'security_groups': {sg1_id: [
+                        {'direction': 'egress', 'ethertype': const.IPv4},
+                        {'direction': 'egress', 'ethertype': const.IPv6},
+                        {'direction': u'ingress',
+                         'protocol': const.PROTO_NAME_TCP,
+                         'ethertype': const.IPv4,
+                         'port_range_max': 25, 'port_range_min': 24,
+                         'remote_group_id': sg2_id}
+                    ]},
+                    'sg_member_ips': {sg2_id: {
+                        'IPv4': [u'10.0.0.3'],
+                        'IPv6': [],
+                    }}
+                }
+                self.assertEqual(expected['security_groups'],
+                                 ports_rpc['security_groups'])
+                self.assertEqual(expected['sg_member_ips'][sg2_id]['IPv4'],
+                                 ports_rpc['sg_member_ips'][sg2_id]['IPv4'])
+                self._delete('ports', port_id1)
+                self._delete('ports', port_id2)
+
     def test_security_group_rules_for_devices_ipv6_ingress(self):
         fake_prefix = FAKE_PREFIX[const.IPv6]
         fake_gateway = FAKE_IP[const.IPv6]
@@ -877,6 +936,8 @@ class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
             'enable_security_group', False,
             group='SECURITYGROUP')
         agent = sg_rpc.SecurityGroupAgentRpcMixin()
+        agent.plugin_rpc = mock.Mock()
+        agent.context = None
         agent.init_firewall()
         self.assertEqual(agent.firewall.__class__.__name__,
                          'NoopFirewallDriver')
@@ -892,13 +953,16 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
         self.agent.context = None
         mock.patch('neutron.agent.linux.iptables_manager').start()
         self.agent.root_helper = 'sudo'
+        rpc = mock.Mock()
+        rpc.security_group_info_for_devices.side_effect = (
+            messaging.UnsupportedVersion('1.2'))
+        self.agent.plugin_rpc = rpc
+
         self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
         self.firewall = mock.Mock()
         firewall_object = firewall_base.FirewallDriver()
         self.firewall.defer_apply.side_effect = firewall_object.defer_apply
         self.agent.firewall = self.firewall
-        rpc = mock.Mock()
-        self.agent.plugin_rpc = rpc
         self.fake_device = {'device': 'fake_device',
                             'security_groups': ['fake_sgid1', 'fake_sgid2'],
                             'security_group_source_groups': ['fake_sgid2'],
@@ -977,6 +1041,141 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
         self.firewall.assert_has_calls([])
 
 
+class SecurityGroupAgentEnhancedRpcTestCase(base.BaseTestCase):
+    def setUp(self, defer_refresh_firewall=False):
+        super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp()
+        cfg.CONF.set_default('firewall_driver',
+                             'neutron.agent.firewall.NoopFirewallDriver',
+                             group='SECURITYGROUP')
+        self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
+        self.agent.context = None
+        mock.patch('neutron.agent.linux.iptables_manager').start()
+        self.agent.root_helper = 'sudo'
+        rpc = mock.Mock()
+        self.agent.plugin_rpc = rpc
+        self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
+        self.firewall = mock.Mock()
+        firewall_object = firewall_base.FirewallDriver()
+        self.firewall.defer_apply.side_effect = firewall_object.defer_apply
+        self.agent.firewall = self.firewall
+        self.fake_device = {'device': 'fake_device',
+                            'security_groups': ['fake_sgid1', 'fake_sgid2'],
+                            'security_group_source_groups': ['fake_sgid2'],
+                            'security_group_rules': [{'security_group_id':
+                                                      'fake_sgid1',
+                                                      'remote_group_id':
+                                                      'fake_sgid2'}]}
+        fake_devices = {'fake_device': self.fake_device}
+        fake_sg_info = {
+            'security_groups': {
+                'fake_sgid1': [
+                    {'remote_group_id': 'fake_sgid2'}], 'fake_sgid2': []},
+            'sg_member_ips': {'fake_sgid2': {'IPv4': [], 'IPv6': []}},
+            'devices': fake_devices}
+        self.firewall.ports = fake_devices
+        rpc.security_group_info_for_devices.return_value = fake_sg_info
+
+    def test_prepare_and_remove_devices_filter_enhanced_rpc(self):
+        self.agent.prepare_devices_filter(['fake_device'])
+        self.agent.remove_devices_filter(['fake_device'])
+        # these two mock are too log, just use tmp_mock to replace them
+        tmp_mock1 = mock.call.update_security_group_rules(
+            'fake_sgid1', [{'remote_group_id': 'fake_sgid2'}])
+        tmp_mock2 = mock.call.update_security_group_members(
+            'fake_sgid2', {'IPv4': [], 'IPv6': []})
+        # ignore device which is not filtered
+        self.firewall.assert_has_calls([mock.call.defer_apply(),
+                                        mock.call.prepare_port_filter(
+                                            self.fake_device),
+                                        mock.call.update_security_group_rules(
+                                            'fake_sgid2', []),
+                                        tmp_mock1,
+                                        tmp_mock2,
+                                        mock.call.defer_apply(),
+                                        mock.call.remove_port_filter(
+                                            self.fake_device),
+                                        ])
+
+    def test_security_groups_rule_updated_enhanced_rpc(self):
+        self.agent.refresh_firewall = mock.Mock()
+        self.agent.prepare_devices_filter(['fake_port_id'])
+        self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3'])
+        self.agent.refresh_firewall.assert_called_once_with(
+            [self.fake_device['device']])
+
+    def test_security_groups_rule_not_updated_enhanced_rpc(self):
+        self.agent.refresh_firewall = mock.Mock()
+        self.agent.prepare_devices_filter(['fake_port_id'])
+        self.agent.security_groups_rule_updated(['fake_sgid3', 'fake_sgid4'])
+        self.assertFalse(self.agent.refresh_firewall.called)
+
+    def test_security_groups_member_updated_enhanced_rpc(self):
+        self.agent.refresh_firewall = mock.Mock()
+        self.agent.prepare_devices_filter(['fake_port_id'])
+        self.agent.security_groups_member_updated(
+            ['fake_sgid2', 'fake_sgid3'])
+
+        self.agent.refresh_firewall.assert_called_once_with(
+            [self.fake_device['device']])
+
+    def test_security_groups_member_not_updated_enhanced_rpc(self):
+        self.agent.refresh_firewall = mock.Mock()
+        self.agent.prepare_devices_filter(['fake_port_id'])
+        self.agent.security_groups_member_updated(
+            ['fake_sgid3', 'fake_sgid4'])
+        self.assertFalse(self.agent.refresh_firewall.called)
+
+    def test_security_groups_provider_updated_enhanced_rpc(self):
+        self.agent.refresh_firewall = mock.Mock()
+        self.agent.security_groups_provider_updated()
+        self.agent.refresh_firewall.assert_has_calls(
+            [mock.call.refresh_firewall()])
+
+    def test_refresh_firewall_enhanced_rpc(self):
+        self.agent.prepare_devices_filter(['fake_port_id'])
+        self.agent.refresh_firewall()
+        calls = [mock.call.defer_apply(),
+                 mock.call.prepare_port_filter(self.fake_device),
+                 mock.call.update_security_group_rules('fake_sgid2', []),
+                 mock.call.update_security_group_rules(
+                     'fake_sgid1', [{'remote_group_id': 'fake_sgid2'}]),
+                 mock.call.update_security_group_members(
+                     'fake_sgid2', {'IPv4': [], 'IPv6': []}),
+                 mock.call.defer_apply(),
+                 mock.call.update_port_filter(self.fake_device),
+                 mock.call.update_security_group_rules('fake_sgid2', []),
+                 mock.call.update_security_group_rules(
+                     'fake_sgid1', [{'remote_group_id': 'fake_sgid2'}]),
+                 mock.call.update_security_group_members(
+                     'fake_sgid2', {'IPv4': [], 'IPv6': []})]
+        self.firewall.assert_has_calls(calls)
+
+    def test_refresh_firewall_devices_enhanced_rpc(self):
+        self.agent.prepare_devices_filter(['fake_device'])
+        self.agent.refresh_firewall([self.fake_device])
+        calls = [mock.call.defer_apply(),
+                 mock.call.prepare_port_filter(self.fake_device),
+                 mock.call.update_security_group_rules('fake_sgid2', []),
+                 mock.call.update_security_group_rules('fake_sgid1', [
+                     {'remote_group_id': 'fake_sgid2'}]),
+                 mock.call.update_security_group_members('fake_sgid2', {
+                     'IPv4': [], 'IPv6': []
+                 }),
+                 mock.call.defer_apply(),
+                 mock.call.update_port_filter(self.fake_device),
+                 mock.call.update_security_group_rules('fake_sgid2', []),
+                 mock.call.update_security_group_rules('fake_sgid1', [
+                     {'remote_group_id': 'fake_sgid2'}]),
+                 mock.call.update_security_group_members('fake_sgid2', {
+                     'IPv4': [], 'IPv6': []})
+                 ]
+        self.firewall.assert_has_calls(calls)
+
+    def test_refresh_firewall_none_enhanced_rpc(self):
+        self.agent.refresh_firewall([])
+        self.firewall.assert_has_calls([])
+
+
 class SecurityGroupAgentRpcWithDeferredRefreshTestCase(
     SecurityGroupAgentRpcTestCase):
 
@@ -1198,7 +1397,7 @@ class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
                  {'devices': ['fake_device']},
               'method': 'security_group_rules_for_devices',
               'namespace': None},
-             version=sg_rpc.SG_RPC_VERSION)])
+             version='1.1')])
 
 
 class FakeSGNotifierAPI(n_rpc.RpcProxy,
@@ -1309,8 +1508,8 @@ IPTABLES_FILTER_1 = """# Generated by iptables_manager
 %(physdev_is_bridged)s -j %(bn)s-i_port1
 [0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
 [0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
@@ -1319,8 +1518,8 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port1
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
 %(physdev_is_bridged)s -j %(bn)s-o_port1
-[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
-RETURN
+[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
+-j RETURN
 [0:0] -A %(bn)s-s_port1 -j DROP
 [0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
 [0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@@ -1360,10 +1559,10 @@ IPTABLES_FILTER_1_2 = """# Generated by iptables_manager
 %(physdev_is_bridged)s -j %(bn)s-i_port1
 [0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.4 -j RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.4/32 -j RETURN
 [0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
 %(physdev_is_bridged)s -j %(bn)s-sg-chain
@@ -1371,8 +1570,8 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port1
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
 %(physdev_is_bridged)s -j %(bn)s-o_port1
-[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
-RETURN
+[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
+-j RETURN
 [0:0] -A %(bn)s-s_port1 -j DROP
 [0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
 [0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@@ -1416,10 +1615,10 @@ IPTABLES_FILTER_2 = """# Generated by iptables_manager
 %(physdev_is_bridged)s -j %(bn)s-i_port1
 [0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.4 -j RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.4/32 -j RETURN
 [0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
 %(physdev_is_bridged)s -j %(bn)s-sg-chain
@@ -1427,7 +1626,7 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port1
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
 %(physdev_is_bridged)s -j %(bn)s-o_port1
-[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 \
+[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
 -j RETURN
 [0:0] -A %(bn)s-s_port1 -j DROP
 [0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
@@ -1443,10 +1642,10 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-i_port2
 [0:0] -A %(bn)s-i_port2 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port2 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port2 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port2 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port2 -p tcp -m tcp --dport 22 -j RETURN
-[0:0] -A %(bn)s-i_port2 -s 10.0.0.3 -j RETURN
+[0:0] -A %(bn)s-i_port2 -s 10.0.0.3/32 -j RETURN
 [0:0] -A %(bn)s-i_port2 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port2 \
 %(physdev_is_bridged)s -j %(bn)s-sg-chain
@@ -1454,7 +1653,7 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port2
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port2 \
 %(physdev_is_bridged)s -j %(bn)s-o_port2
-[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4 \
+[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4/32 \
 -j RETURN
 [0:0] -A %(bn)s-s_port2 -j DROP
 [0:0] -A %(bn)s-o_port2 -p udp -m udp --sport 68 --dport 67 -j RETURN
@@ -1497,8 +1696,8 @@ IPTABLES_FILTER_2_2 = """# Generated by iptables_manager
 %(physdev_is_bridged)s -j %(bn)s-i_port1
 [0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
 [0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
@@ -1507,8 +1706,8 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port1
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
 %(physdev_is_bridged)s -j %(bn)s-o_port1
-[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
-RETURN
+[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
+-j RETURN
 [0:0] -A %(bn)s-s_port1 -j DROP
 [0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
 [0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@@ -1523,10 +1722,10 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-i_port2
 [0:0] -A %(bn)s-i_port2 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port2 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port2 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port2 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port2 -p tcp -m tcp --dport 22 -j RETURN
-[0:0] -A %(bn)s-i_port2 -s 10.0.0.3 -j RETURN
+[0:0] -A %(bn)s-i_port2 -s 10.0.0.3/32 -j RETURN
 [0:0] -A %(bn)s-i_port2 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port2 \
 %(physdev_is_bridged)s -j %(bn)s-sg-chain
@@ -1534,8 +1733,8 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port2
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port2 \
 %(physdev_is_bridged)s -j %(bn)s-o_port2
-[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4 -j \
-RETURN
+[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4/32 \
+-j RETURN
 [0:0] -A %(bn)s-s_port2 -j DROP
 [0:0] -A %(bn)s-o_port2 -p udp -m udp --sport 68 --dport 67 -j RETURN
 [0:0] -A %(bn)s-o_port2 -j %(bn)s-s_port2
@@ -1577,10 +1776,10 @@ IPTABLES_FILTER_2_3 = """# Generated by iptables_manager
 %(physdev_is_bridged)s -j %(bn)s-i_port1
 [0:0] -A %(bn)s-i_port1 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port1 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port1 -p tcp -m tcp --dport 22 -j RETURN
-[0:0] -A %(bn)s-i_port1 -s 10.0.0.4 -j RETURN
+[0:0] -A %(bn)s-i_port1 -s 10.0.0.4/32 -j RETURN
 [0:0] -A %(bn)s-i_port1 -p icmp -j RETURN
 [0:0] -A %(bn)s-i_port1 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port1 \
@@ -1589,8 +1788,8 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port1
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port1 \
 %(physdev_is_bridged)s -j %(bn)s-o_port1
-[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3 -j \
-RETURN
+[0:0] -A %(bn)s-s_port1 -m mac --mac-source 12:34:56:78:9a:bc -s 10.0.0.3/32 \
+-j RETURN
 [0:0] -A %(bn)s-s_port1 -j DROP
 [0:0] -A %(bn)s-o_port1 -p udp -m udp --sport 68 --dport 67 -j RETURN
 [0:0] -A %(bn)s-o_port1 -j %(bn)s-s_port1
@@ -1605,10 +1804,10 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-i_port2
 [0:0] -A %(bn)s-i_port2 -m state --state INVALID -j DROP
 [0:0] -A %(bn)s-i_port2 -m state --state RELATED,ESTABLISHED -j RETURN
-[0:0] -A %(bn)s-i_port2 -s 10.0.0.2 -p udp -m udp --sport 67 --dport 68 -j \
-RETURN
+[0:0] -A %(bn)s-i_port2 -s 10.0.0.2/32 -p udp -m udp --sport 67 --dport 68 \
+-j RETURN
 [0:0] -A %(bn)s-i_port2 -p tcp -m tcp --dport 22 -j RETURN
-[0:0] -A %(bn)s-i_port2 -s 10.0.0.3 -j RETURN
+[0:0] -A %(bn)s-i_port2 -s 10.0.0.3/32 -j RETURN
 [0:0] -A %(bn)s-i_port2 -p icmp -j RETURN
 [0:0] -A %(bn)s-i_port2 -j %(bn)s-sg-fallback
 [0:0] -A %(bn)s-FORWARD %(physdev_mod)s --physdev-EGRESS tap_port2 \
@@ -1617,8 +1816,8 @@ RETURN
 %(physdev_is_bridged)s -j %(bn)s-o_port2
 [0:0] -A %(bn)s-INPUT %(physdev_mod)s --physdev-EGRESS tap_port2 \
 %(physdev_is_bridged)s -j %(bn)s-o_port2
-[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4 -j \
-RETURN
+[0:0] -A %(bn)s-s_port2 -m mac --mac-source 12:34:56:78:9a:bd -s 10.0.0.4/32 \
+-j RETURN
 [0:0] -A %(bn)s-s_port2 -j DROP
 [0:0] -A %(bn)s-o_port2 -p udp -m udp --sport 68 --dport 67 -j RETURN
 [0:0] -A %(bn)s-o_port2 -j %(bn)s-s_port2
@@ -1832,7 +2031,13 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
 
         self.root_helper = 'sudo'
         self.agent.root_helper = 'sudo'
-        self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
+        self.rpc = mock.Mock()
+        self.agent.plugin_rpc = self.rpc
+        self.rpc.security_group_info_for_devices.side_effect = (
+            messaging.UnsupportedVersion('1.2'))
+
+        self.agent.init_firewall(
+            defer_refresh_firewall=defer_refresh_firewall)
 
         self.iptables = self.agent.firewall.iptables
         # TODO(jlibosva) Get rid of mocking iptables execute and mock out
@@ -1846,12 +2051,10 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
         self.expected_process_inputs = []
         self.iptables_execute.side_effect = self.iptables_execute_return_values
 
-        self.rpc = mock.Mock()
-        self.agent.plugin_rpc = self.rpc
         rule1 = [{'direction': 'ingress',
                   'protocol': const.PROTO_NAME_UDP,
                   'ethertype': const.IPv4,
-                  'source_ip_prefix': '10.0.0.2',
+                  'source_ip_prefix': '10.0.0.2/32',
                   'source_port_range_min': 67,
                   'source_port_range_max': 67,
                   'port_range_min': 68,
@@ -1865,7 +2068,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
                   'ethertype': const.IPv4}]
         rule2 = rule1[:]
         rule2 += [{'direction': 'ingress',
-                  'source_ip_prefix': '10.0.0.4',
+                  'source_ip_prefix': '10.0.0.4/32',
                   'ethertype': const.IPv4}]
         rule3 = rule2[:]
         rule3 += [{'direction': 'ingress',
@@ -1873,30 +2076,31 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
                   'ethertype': const.IPv4}]
         rule4 = rule1[:]
         rule4 += [{'direction': 'ingress',
-                  'source_ip_prefix': '10.0.0.3',
+                  'source_ip_prefix': '10.0.0.3/32',
                   'ethertype': const.IPv4}]
         rule5 = rule4[:]
         rule5 += [{'direction': 'ingress',
                   'protocol': const.PROTO_NAME_ICMP,
                   'ethertype': const.IPv4}]
+
         self.devices1 = {'tap_port1': self._device('tap_port1',
-                                                   '10.0.0.3',
+                                                   '10.0.0.3/32',
                                                    '12:34:56:78:9a:bc',
                                                    rule1)}
         self.devices2 = {'tap_port1': self._device('tap_port1',
-                                                   '10.0.0.3',
+                                                   '10.0.0.3/32',
                                                    '12:34:56:78:9a:bc',
                                                    rule2),
                          'tap_port2': self._device('tap_port2',
-                                                   '10.0.0.4',
+                                                   '10.0.0.4/32',
                                                    '12:34:56:78:9a:bd',
                                                    rule4)}
         self.devices3 = {'tap_port1': self._device('tap_port1',
-                                                   '10.0.0.3',
+                                                   '10.0.0.3/32',
                                                    '12:34:56:78:9a:bc',
                                                    rule3),
                          'tap_port2': self._device('tap_port2',
-                                                   '10.0.0.4',
+                                                   '10.0.0.4/32',
                                                    '12:34:56:78:9a:bd',
                                                    rule5)}
 
@@ -2008,6 +2212,121 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
         self._verify_mock_calls()
 
 
+class TestSecurityGroupAgentEnhancedRpcWithIptables(
+    TestSecurityGroupAgentWithIptables):
+    def setUp(self, defer_refresh_firewall=False):
+        super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
+            defer_refresh_firewall)
+        self.rpc = mock.Mock()
+        self.agent.plugin_rpc = self.rpc
+        self.sg_info = self.rpc.security_group_info_for_devices
+        self.agent.init_firewall(
+            defer_refresh_firewall=defer_refresh_firewall)
+        self.iptables = self.agent.firewall.iptables
+        self.iptables_execute = mock.patch.object(self.iptables,
+                                                  "execute").start()
+        self.iptables_execute_return_values = []
+        self.expected_call_count = 0
+        self.expected_calls = []
+        self.expected_process_inputs = []
+        self.iptables_execute.side_effect = (
+            self.iptables_execute_return_values)
+
+        rule1 = [{'direction': 'ingress',
+                  'protocol': const.PROTO_NAME_UDP,
+                  'ethertype': const.IPv4,
+                  'source_ip_prefix': '10.0.0.2/32',
+                  'source_port_range_min': 67,
+                  'source_port_range_max': 67,
+                  'port_range_min': 68,
+                  'port_range_max': 68},
+                 {'direction': 'ingress',
+                  'protocol': const.PROTO_NAME_TCP,
+                  'ethertype': const.IPv4,
+                  'port_range_min': 22,
+                  'port_range_max': 22},
+                 {'direction': 'egress',
+                  'ethertype': const.IPv4},
+                 {'direction': 'ingress',
+                  'remote_group_id': 'security_group1',
+                  'ethertype': const.IPv4}]
+        rule2 = rule1[:]
+        rule2 += [{'direction': 'ingress',
+                  'protocol': const.PROTO_NAME_ICMP,
+                  'ethertype': const.IPv4}]
+
+        devices_info1 = {'tap_port1': self._device('tap_port1',
+                                                   '10.0.0.3/32',
+                                                   '12:34:56:78:9a:bc',
+                                                   [])}
+        self.devices_info1 = {'security_groups': {'security_group1': rule1},
+                         'sg_member_ips': {
+                             'security_group1': {
+                                 'IPv4': ['10.0.0.3/32'], 'IPv6': []}},
+                         'devices': devices_info1}
+        devices_info2 = {'tap_port1': self._device('tap_port1',
+                                                   '10.0.0.3/32',
+                                                   '12:34:56:78:9a:bc',
+                                                   []),
+                         'tap_port2': self._device('tap_port2',
+                                                   '10.0.0.4/32',
+                                                   '12:34:56:78:9a:bd',
+                                                   [])}
+        self.devices_info2 = {'security_groups': {'security_group1': rule1},
+                         'sg_member_ips': {
+                             'security_group1': {
+                                 'IPv4': ['10.0.0.3/32', '10.0.0.4/32'],
+                                 'IPv6': []}},
+                         'devices': devices_info2}
+        self.devices_info3 = {'security_groups': {'security_group1': rule2},
+                         'sg_member_ips': {
+                             'security_group1': {
+                                 'IPv4': ['10.0.0.3/32', '10.0.0.4/32'],
+                                 'IPv6': []}},
+                         'devices': devices_info2}
+
+    def test_prepare_remove_port(self):
+        self.sg_info.return_value = self.devices_info1
+        self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1)
+        self._replay_iptables(IPTABLES_FILTER_EMPTY, IPTABLES_FILTER_V6_EMPTY)
+
+        self.agent.prepare_devices_filter(['tap_port1'])
+        self.agent.remove_devices_filter(['tap_port1'])
+
+        self._verify_mock_calls()
+
+    def test_security_group_member_updated(self):
+        self.sg_info.return_value = self.devices_info1
+        self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1)
+        self._replay_iptables(IPTABLES_FILTER_1_2, IPTABLES_FILTER_V6_1)
+        self._replay_iptables(IPTABLES_FILTER_2, IPTABLES_FILTER_V6_2)
+        self._replay_iptables(IPTABLES_FILTER_2_2, IPTABLES_FILTER_V6_2)
+        self._replay_iptables(IPTABLES_FILTER_1, IPTABLES_FILTER_V6_1)
+        self._replay_iptables(IPTABLES_FILTER_EMPTY, IPTABLES_FILTER_V6_EMPTY)
+
+        self.agent.prepare_devices_filter(['tap_port1'])
+        self.sg_info.return_value = self.devices_info2
+        self.agent.security_groups_member_updated(['security_group1'])
+        self.agent.prepare_devices_filter(['tap_port2'])
+        self.sg_info.return_value = self.devices_info1
+        self.agent.security_groups_member_updated(['security_group1'])
+        self.agent.remove_devices_filter(['tap_port2'])
+        self.agent.remove_devices_filter(['tap_port1'])
+
+        self._verify_mock_calls()
+
+    def test_security_group_rule_updated(self):
+        self.sg_info.return_value = self.devices_info2
+        self._replay_iptables(IPTABLES_FILTER_2, IPTABLES_FILTER_V6_2)
+        self._replay_iptables(IPTABLES_FILTER_2_3, IPTABLES_FILTER_V6_2)
+
+        self.agent.prepare_devices_filter(['tap_port1', 'tap_port3'])
+        self.sg_info.return_value = self.devices_info3
+        self.agent.security_groups_rule_updated(['security_group1'])
+
+        self._verify_mock_calls()
+
+
 class SGNotificationTestMixin():
     def test_security_group_rule_updated(self):
         name = 'webservers'