]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Support iptables-based security group in NEC plugin
authorAkihiro MOTOKI <motoki@da.jp.nec.com>
Fri, 8 Feb 2013 13:18:33 +0000 (22:18 +0900)
committerAkihiro MOTOKI <motoki@da.jp.nec.com>
Wed, 13 Feb 2013 22:37:52 +0000 (07:37 +0900)
blueprint nec-security-group

This commit also refactors RPC API and callbacks in the plugin and agent
to support security group RPC.

Change-Id: I09d69ca3aff43e0468bbd5df6367de767af27acc

etc/quantum/plugins/nec/nec.ini
quantum/agent/securitygroups_rpc.py
quantum/db/migration/alembic_migrations/versions/3cb5d900c5de_security_groups.py
quantum/plugins/nec/agent/nec_quantum_agent.py
quantum/plugins/nec/db/api.py
quantum/plugins/nec/nec_plugin.py
quantum/tests/unit/nec/test_nec_plugin.py
quantum/tests/unit/nec/test_security_group.py [new file with mode: 0644]

index 3de76e11594c648d78b71756068bfe1c3ee6b7d9..21737c50585e87cd74a5714d38574be3eef169f2 100644 (file)
@@ -39,6 +39,10 @@ polling_interval = 2
 # Change to "sudo" to skip the filtering and just run the comand directly
 root_helper = sudo
 
+[SECURITYGROUP]
+# Firewall driver for realizing quantum security group function
+firewall_driver = quantum.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
+
 [OFC]
 # Specify OpenFlow Controller Host, Port and Driver to connect.
 host = 127.0.0.1
index 889d90e65561946a1c16c563ce04d6213b48251d..01b4e110fea667bdb838c12c97be7f1da0718c9f 100644 (file)
@@ -86,9 +86,9 @@ class SecurityGroupAgentRpcMixin(object):
     """
 
     def init_firewall(self):
-        LOG.debug(_("Init firewall settings"))
-        self.firewall = importutils.import_object(
-            cfg.CONF.SECURITYGROUP.firewall_driver)
+        firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
+        LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
+        self.firewall = importutils.import_object(firewall_driver)
 
     def prepare_devices_filter(self, device_ids):
         if not device_ids:
index 2478ff9eb025bdfc4ad5558880e2ca3e5e9e6ce3..59e006537326ac337bf14324aebe8b19396ffe59 100644 (file)
@@ -33,6 +33,7 @@ migration_for_plugins = [
     'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
     'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2',
     'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
+    'quantum.plugins.nec.nec_plugin.NECPluginV2',
 ]
 
 from alembic import op
index 8c8786b4e204e645cf68a653ae7f3a5f7a4107fd..a170e0db6cb4e986dfb037602c5350fed976a0b9 100755 (executable)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 # @author: Ryota MIBU
+# @author: Akihiro MOTOKI
 
 import socket
 import sys
 import time
 
+import eventlet
+
 from quantum.agent.linux import ovs_lib
+from quantum.agent import rpc as agent_rpc
+from quantum.agent import securitygroups_rpc as sg_rpc
 from quantum.common import config as logging_config
 from quantum.common import topics
-from quantum import context
+from quantum import context as q_context
+from quantum.extensions import securitygroup as ext_sg
 from quantum.openstack.common import log as logging
-from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
 from quantum.plugins.nec.common import config
 
 
 LOG = logging.getLogger(__name__)
 
 
+class NECPluginApi(agent_rpc.PluginApi):
+    BASE_RPC_API_VERSION = '1.0'
+
+    def update_ports(self, context, agent_id, datapath_id,
+                     port_added, port_removed):
+        """RPC to update information of ports on Quantum Server"""
+        LOG.info(_("Update ports: added=%(added)s, "
+                   "removed=%(removed)s"),
+                 {'added': port_added, 'removed': port_removed})
+        try:
+            self.call(context,
+                      self.make_msg('update_ports',
+                                    topic=topics.AGENT,
+                                    agent_id=agent_id,
+                                    datapath_id=datapath_id,
+                                    port_added=port_added,
+                                    port_removed=port_removed))
+        except Exception as e:
+            LOG.warn(_("update_ports() failed."))
+            return
+
+
+class NECAgentRpcCallback(object):
+
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self, context, agent, sg_agent):
+        self.context = context
+        self.agent = agent
+        self.sg_agent = sg_agent
+
+    def port_update(self, context, **kwargs):
+        LOG.debug(_("port_update received: %s"), kwargs)
+        port = kwargs.get('port')
+        # Validate that port is on OVS
+        vif_port = self.agent.int_br.get_vif_port_by_id(port['id'])
+        if not vif_port:
+            return
+
+        if ext_sg.SECURITYGROUPS in port:
+            self.sg_agent.refresh_firewall()
+
+
+class SecurityGroupServerRpcApi(proxy.RpcProxy,
+                                sg_rpc.SecurityGroupServerRpcApiMixin):
+
+    def __init__(self, topic):
+        super(SecurityGroupServerRpcApi, self).__init__(
+            topic=topic, default_version=sg_rpc.SG_RPC_VERSION)
+
+
+class SecurityGroupAgentRpcCallback(
+    sg_rpc.SecurityGroupAgentRpcCallbackMixin):
+
+    RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
+
+    def __init__(self, context, sg_agent):
+        self.context = context
+        self.sg_agent = sg_agent
+
+
+class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
+
+    def __init__(self, context):
+        self.context = context
+        self.plugin_rpc = SecurityGroupServerRpcApi(topics.PLUGIN)
+        self.init_firewall()
+
+
 class NECQuantumAgent(object):
 
     def __init__(self, integ_br, root_helper, polling_interval):
@@ -49,36 +125,46 @@ class NECQuantumAgent(object):
         self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
         self.polling_interval = polling_interval
 
+        self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
+        self.setup_rpc()
+
+    def setup_rpc(self):
         self.host = socket.gethostname()
         self.agent_id = 'nec-q-agent.%s' % self.host
-        self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
+        LOG.info(_("RPC agent_id: %s"), self.agent_id)
 
-        # RPC network init
-        self.context = context.get_admin_context_without_session()
-        self.conn = rpc.create_connection(new=True)
+        self.topic = topics.AGENT
+        self.context = q_context.get_admin_context_without_session()
 
-    def update_ports(self, port_added=[], port_removed=[]):
-        """RPC to update information of ports on Quantum Server"""
-        LOG.info(_("Update ports: added=%(port_added)s, "
-                   "removed=%(port_removed)s"),
-                 locals())
-        try:
-            rpc.call(self.context,
-                     topics.PLUGIN,
-                     {'method': 'update_ports',
-                      'args': {'topic': topics.AGENT,
-                               'agent_id': self.agent_id,
-                               'datapath_id': self.datapath_id,
-                               'port_added': port_added,
-                               'port_removed': port_removed}})
-        except Exception as e:
-            LOG.warn(_("update_ports() failed."))
-            return
+        self.plugin_rpc = NECPluginApi(topics.PLUGIN)
+        self.sg_agent = SecurityGroupAgentRpc(self.context)
+
+        # RPC network init
+        # Handle updates from service
+        self.callback_nec = NECAgentRpcCallback(self.context,
+                                                self, self.sg_agent)
+        self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
+                                                         self.sg_agent)
+        self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
+                                                    self.callback_sg])
+        # Define the listening consumer for the agent
+        consumers = [[topics.PORT, topics.UPDATE],
+                     [topics.SECURITY_GROUP, topics.UPDATE]]
+        self.connection = agent_rpc.create_consumers(self.dispatcher,
+                                                     self.topic,
+                                                     consumers)
 
     def _vif_port_to_port_info(self, vif_port):
         return dict(id=vif_port.vif_id, port_no=vif_port.ofport,
                     mac=vif_port.vif_mac)
 
+    def _process_security_group(self, port_added, port_removed):
+        if port_added:
+            devices_added = [p['id'] for p in port_added]
+            self.sg_agent.prepare_devices_filter(devices_added)
+        if port_removed:
+            self.sg_agent.remove_devices_filter(port_removed)
+
     def daemon_loop(self):
         """Main processing loop for NEC Plugin Agent."""
         old_ports = []
@@ -99,7 +185,10 @@ class NECQuantumAgent(object):
                     port_removed.append(port_id)
 
             if port_added or port_removed:
-                self.update_ports(port_added, port_removed)
+                self.plugin_rpc.update_ports(self.context,
+                                             self.agent_id, self.datapath_id,
+                                             port_added, port_removed)
+                self._process_security_group(port_added, port_removed)
             else:
                 LOG.debug(_("No port changed."))
 
@@ -108,6 +197,8 @@ class NECQuantumAgent(object):
 
 
 def main():
+    eventlet.monkey_patch()
+
     config.CONF(project='quantum')
 
     logging_config.setup_logging(config.CONF)
index 7eb822da56e1a0ac090048ba7f5febacebcfebc6..ca2b27656f38bbb70c1941237d647ec04eb90402 100644 (file)
@@ -19,6 +19,10 @@ import sqlalchemy as sa
 
 from quantum.db import api as db
 from quantum.db import model_base
+from quantum.db import models_v2
+from quantum.db import securitygroups_db as sg_db
+from quantum.extensions import securitygroup as ext_sg
+from quantum import manager
 from quantum.openstack.common import log as logging
 # NOTE (e0ne): this import is needed for config init
 from quantum.plugins.nec.common import config
@@ -117,3 +121,29 @@ def del_portinfo(id):
     except sa.orm.exc.NoResultFound:
         LOG.warning(_("del_portinfo(): NotFound portinfo for "
                       "port_id: %s"), id)
+
+
+def get_port_from_device(port_id):
+    """Get port from database"""
+    LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id)
+    session = db.get_session()
+    sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
+
+    query = session.query(models_v2.Port,
+                          sg_db.SecurityGroupPortBinding.security_group_id)
+    query = query.outerjoin(sg_db.SecurityGroupPortBinding,
+                            models_v2.Port.id == sg_binding_port)
+    query = query.filter(models_v2.Port.id == port_id)
+    port_and_sgs = query.all()
+    if not port_and_sgs:
+        return None
+    port = port_and_sgs[0][0]
+    plugin = manager.QuantumManager.get_plugin()
+    port_dict = plugin._make_port_dict(port)
+    port_dict[ext_sg.SECURITYGROUPS] = [
+        sg_id for port, sg_id in port_and_sgs if sg_id]
+    port_dict['security_group_rules'] = []
+    port_dict['security_group_source_groups'] = []
+    port_dict['fixed_ips'] = [ip['ip_address']
+                              for ip in port['fixed_ips']]
+    return port_dict
index 552f8ce0588087c0cfd01fdd761c251bd2f26424..0149157c0dd51a4846c5022aebb2c547c8b18206 100644 (file)
@@ -15,6 +15,9 @@
 #    under the License.
 # @author: Ryota MIBU
 
+from quantum.agent import securitygroups_rpc as sg_rpc
+from quantum.common import constants as q_const
+from quantum.common import exceptions as q_exc
 from quantum.common import rpc as q_rpc
 from quantum.common import topics
 from quantum import context
@@ -23,9 +26,12 @@ from quantum.db import l3_db
 from quantum.db import l3_rpc_base
 #NOTE(amotoki): quota_db cannot be removed, it is for db model
 from quantum.db import quota_db
+from quantum.db import securitygroups_rpc_base as sg_db_rpc
 from quantum.extensions import portbindings
+from quantum.extensions import securitygroup as ext_sg
 from quantum.openstack.common import log as logging
 from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
 from quantum.plugins.nec.common import config
 from quantum.plugins.nec.common import exceptions as nexc
 from quantum.plugins.nec.db import api as ndb
@@ -51,7 +57,9 @@ class OperationalStatus:
     ERROR = "ERROR"
 
 
-class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
+class NECPluginV2(nec_plugin_base.NECPluginV2Base,
+                  l3_db.L3_NAT_db_mixin,
+                  sg_db_rpc.SecurityGroupServerRpcMixin):
     """NECPluginV2 controls an OpenFlow Controller.
 
     The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks
@@ -65,7 +73,8 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
     information to and from the plugin.
     """
 
-    supported_extension_aliases = ["router", "quotas", "binding"]
+    supported_extension_aliases = ["router", "quotas", "binding",
+                                   "security-group"]
 
     binding_view = "extension:port_binding:view"
     binding_set = "extension:port_binding:set"
@@ -81,21 +90,28 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
 
         self.setup_rpc()
 
-    def _check_view_auth(self, context, resource, action):
-        return policy.check(context, action, resource)
-
-    def _enforce_set_auth(self, context, resource, action):
-        policy.enforce(context, action, resource)
-
     def setup_rpc(self):
         self.topic = topics.PLUGIN
         self.conn = rpc.create_connection(new=True)
-        self.callbacks = NECPluginV2RPCCallbacks(self)
-        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
+
+        self.callback_nec = NECPluginV2RPCCallbacks(self)
+        self.callback_dhcp = DhcpRpcCallback()
+        self.callback_l3 = L3RpcCallback()
+        self.callback_sg = SecurityGroupServerRpcCallback()
+        callbacks = [self.callback_nec, self.callback_dhcp,
+                     self.callback_l3, self.callback_sg]
+        self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
         self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
 
+    def _check_view_auth(self, context, resource, action):
+        return policy.check(context, action, resource)
+
+    def _enforce_set_auth(self, context, resource, action):
+        policy.enforce(context, action, resource)
+
     def _update_resource_status(self, context, resource, id, status):
         """Update status of specified resource."""
         request = {}
@@ -199,8 +215,12 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
         """Create a new network entry on DB, and create it on OFC."""
         LOG.debug(_("NECPluginV2.create_network() called, "
                     "network=%s ."), network)
-        session = context.session
-        with session.begin(subtransactions=True):
+        #set up default security groups
+        tenant_id = self._get_tenant_id_for_create(
+            context, network['network'])
+        self._ensure_default_security_group(context, tenant_id)
+
+        with context.session.begin(subtransactions=True):
             new_net = super(NECPluginV2, self).create_network(context, network)
             self._process_l3_create(context, network['network'], new_net['id'])
             self._extend_network_dict_l3(context, new_net)
@@ -337,12 +357,25 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
     def create_port(self, context, port):
         """Create a new port entry on DB, then try to activate it."""
         LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
-        new_port = super(NECPluginV2, self).create_port(context, port)
-        self._update_resource_status(context, "port", new_port['id'],
-                                     OperationalStatus.BUILD)
+        with context.session.begin(subtransactions=True):
+            self._ensure_default_security_group_on_port(context, port)
+            sgids = self._get_security_groups_on_port(context, port)
+            port = super(NECPluginV2, self).create_port(context, port)
+            self._process_port_create_security_group(
+                context, port['id'], sgids)
+            self._extend_port_dict_security_group(context, port)
+        # Note: In order to allow dhcp packets,
+        # changes for dhcp ip should be notifified
+        if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
+            self.notifier.security_groups_provider_updated(context)
+        else:
+            self.notifier.security_groups_member_updated(
+                context, port.get(ext_sg.SECURITYGROUPS))
 
-        self.activate_port_if_ready(context, new_port)
-        return self._extend_port_dict_binding(context, new_port)
+        self._update_resource_status(context, "port", port['id'],
+                                     OperationalStatus.BUILD)
+        self.activate_port_if_ready(context, port)
+        return self._extend_port_dict_binding(context, port)
 
     def update_port(self, context, id, port):
         """Update port, and handle packetfilters associated with the port.
@@ -352,23 +385,37 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
         """
         LOG.debug(_("NECPluginV2.update_port() called, "
                     "id=%(id)s port=%(port)s ."), locals())
-        old_port = super(NECPluginV2, self).get_port(context, id)
-        new_port = super(NECPluginV2, self).update_port(context, id, port)
-
-        changed = (old_port['admin_state_up'] is
-                   not new_port['admin_state_up'])
+        need_port_update_notify = False
+        with context.session.begin(subtransactions=True):
+            old_port = super(NECPluginV2, self).get_port(context, id)
+            new_port = super(NECPluginV2, self).update_port(context, id, port)
+            need_port_update_notify = self.update_security_group_on_port(
+                context, id, port, old_port, new_port)
+
+        need_port_update_notify |= self.is_security_group_member_updated(
+            context, old_port, new_port)
+        if need_port_update_notify:
+            self.notifier.port_update(context, new_port)
+
+        changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
         if changed:
             if new_port['admin_state_up']:
                 self.activate_port_if_ready(context, new_port)
             else:
                 self.deactivate_port(context, old_port)
 
+        # NOTE: _extend_port_dict_security_group() is called in
+        # update_security_group_on_port() above, so we don't need to
+        # call it here.
         return self._extend_port_dict_binding(context, new_port)
 
     def delete_port(self, context, id, l3_port_check=True):
         """Delete port and packet_filters associated with the port."""
         LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id)
-        port = super(NECPluginV2, self).get_port(context, id)
+        # ext_sg.SECURITYGROUPS attribute for the port is required
+        # since notifier.security_groups_member_updated() need the attribute.
+        # Thus we need to call self.get_port() instead of super().get_port()
+        port = self.get_port(context, id)
 
         self.deactivate_port(context, port)
 
@@ -384,22 +431,27 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
         # and l3-router.  If so, we should prevent deletion.
         if l3_port_check:
             self.prevent_l3_port_deletion(context, id)
-        self.disassociate_floatingips(context, id)
-        super(NECPluginV2, self).delete_port(context, id)
+        with context.session.begin(subtransactions=True):
+            self.disassociate_floatingips(context, id)
+            self._delete_port_security_group_bindings(context, id)
+            super(NECPluginV2, self).delete_port(context, id)
+        self.notifier.security_groups_member_updated(
+            context, port.get(ext_sg.SECURITYGROUPS))
 
     def get_port(self, context, id, fields=None):
-        session = context.session
-        with session.begin(subtransactions=True):
+        with context.session.begin(subtransactions=True):
             port = super(NECPluginV2, self).get_port(context, id, fields)
+            self._extend_port_dict_security_group(context, port)
             self._extend_port_dict_binding(context, port)
         return self._fields(port, fields)
 
     def get_ports(self, context, filters=None, fields=None):
-        session = context.session
-        with session.begin(subtransactions=True):
+        with context.session.begin(subtransactions=True):
             ports = super(NECPluginV2, self).get_ports(context, filters,
                                                        fields)
+            # TODO(amotoki) filter by security group
             for port in ports:
+                self._extend_port_dict_security_group(context, port)
                 self._extend_port_dict_binding(context, port)
         return [self._fields(port, fields) for port in ports]
 
@@ -529,8 +581,52 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
         super(NECPluginV2, self).delete_packet_filter(context, id)
 
 
-class NECPluginV2RPCCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
-                              l3_rpc_base.L3RpcCallbackMixin):
+class NECPluginV2AgentNotifierApi(proxy.RpcProxy,
+                                  sg_rpc.SecurityGroupAgentRpcApiMixin):
+    '''RPC API for NEC plugin agent'''
+
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic):
+        super(NECPluginV2AgentNotifierApi, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+        self.topic_port_update = topics.get_topic_name(
+            topic, topics.PORT, topics.UPDATE)
+
+    def port_update(self, context, port):
+        self.fanout_cast(context,
+                         self.make_msg('port_update',
+                                       port=port),
+                         topic=self.topic_port_update)
+
+
+class DhcpRpcCallback(dhcp_rpc_base.DhcpRpcCallbackMixin):
+    # DhcpPluginApi BASE_RPC_API_VERSION
+    RPC_API_VERSION = '1.0'
+
+
+class L3RpcCallback(l3_rpc_base.L3RpcCallbackMixin):
+    # L3PluginApi BASE_RPC_API_VERSION
+    RPC_API_VERSION = '1.0'
+
+
+class SecurityGroupServerRpcCallback(
+    sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+
+    RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
+
+    @staticmethod
+    def get_port_from_device(device):
+        port = ndb.get_port_from_device(device)
+        if port:
+            port['device'] = device
+        LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, "
+                    "device=%(device)s => %(ret)s."),
+                  {'device': device, 'ret': port})
+        return port
+
+
+class NECPluginV2RPCCallbacks(object):
 
     RPC_API_VERSION = '1.0'
 
index 9a06f96d779c14339905ce6bc9b56920bd3e6477..747583ad354f42b1922d8eeee785d3732ac9616e 100644 (file)
@@ -39,7 +39,7 @@ class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase,
                      test_bindings.PortBindingsTestCase):
 
     VIF_TYPE = portbindings.VIF_TYPE_OVS
-    HAS_SECURITY_GROUP = False
+    HAS_PORT_FILTER = True
 
 
 class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
diff --git a/quantum/tests/unit/nec/test_security_group.py b/quantum/tests/unit/nec/test_security_group.py
new file mode 100644 (file)
index 0000000..8b1512e
--- /dev/null
@@ -0,0 +1,88 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013, NEC Corporation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import contextlib
+
+import mock
+
+from quantum.api.v2 import attributes
+from quantum.extensions import securitygroup as ext_sg
+from quantum import manager
+from quantum.plugins.nec.db import api as ndb
+from quantum.tests.unit import test_extension_security_group as test_sg
+from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
+
+PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2')
+AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent')
+NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi')
+
+
+class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
+    _plugin_name = PLUGIN_NAME
+
+    def setUp(self, plugin=None):
+        self.addCleanup(mock.patch.stopall)
+        notifier_p = mock.patch(NOTIFIER)
+        notifier_cls = notifier_p.start()
+        self.notifier = mock.Mock()
+        notifier_cls.return_value = self.notifier
+        self._attribute_map_bk_ = {}
+        for item in attributes.RESOURCE_ATTRIBUTE_MAP:
+            self._attribute_map_bk_[item] = (attributes.
+                                             RESOURCE_ATTRIBUTE_MAP[item].
+                                             copy())
+        super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
+
+    def tearDown(self):
+        super(NecSecurityGroupsTestCase, self).tearDown()
+        attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
+
+
+class TestNecSecurityGroups(NecSecurityGroupsTestCase,
+                            test_sg.TestSecurityGroups,
+                            test_sg_rpc.SGNotificationTestMixin):
+
+    def test_security_group_get_port_from_device(self):
+        with contextlib.nested(self.network(),
+                               self.security_group()) as (n, sg):
+            with self.subnet(n):
+                res = self._create_port(self.fmt, n['network']['id'])
+                port = self.deserialize(self.fmt, res)
+                port_id = port['port']['id']
+                sg_id = sg['security_group']['id']
+                fixed_ips = port['port']['fixed_ips']
+
+                data = {'port': {'fixed_ips': fixed_ips,
+                                 'name': port['port']['name'],
+                                 ext_sg.SECURITYGROUPS: [sg_id]}}
+                req = self.new_update_request('ports', data, port_id)
+                res = self.deserialize(self.fmt,
+                                       req.get_response(self.api))
+
+                plugin = manager.QuantumManager.get_plugin()
+                port_dict = plugin.callback_sg.get_port_from_device(port_id)
+                self.assertEqual(port_id, port_dict['id'])
+                self.assertEqual([sg_id],
+                                 port_dict[ext_sg.SECURITYGROUPS])
+                self.assertEqual([], port_dict['security_group_rules'])
+                self.assertEqual([fixed_ips[0]['ip_address']],
+                                 port_dict['fixed_ips'])
+                self._delete('ports', port_id)
+
+
+class TestNecSecurityGroupsXML(TestNecSecurityGroups):
+    fmt = 'xml'