# License for the specific language governing permissions and limitations
# under the License.
# @author: Ryota MIBU
+# @author: Akihiro MOTOKI
import socket
import sys
import time
+import eventlet
+
from quantum.agent.linux import ovs_lib
+from quantum.agent import rpc as agent_rpc
+from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config
from quantum.common import topics
-from quantum import context
+from quantum import context as q_context
+from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
-from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
from quantum.plugins.nec.common import config
LOG = logging.getLogger(__name__)
+class NECPluginApi(agent_rpc.PluginApi):
+ BASE_RPC_API_VERSION = '1.0'
+
+ def update_ports(self, context, agent_id, datapath_id,
+ port_added, port_removed):
+ """RPC to update information of ports on Quantum Server"""
+ LOG.info(_("Update ports: added=%(added)s, "
+ "removed=%(removed)s"),
+ {'added': port_added, 'removed': port_removed})
+ try:
+ self.call(context,
+ self.make_msg('update_ports',
+ topic=topics.AGENT,
+ agent_id=agent_id,
+ datapath_id=datapath_id,
+ port_added=port_added,
+ port_removed=port_removed))
+ except Exception as e:
+ LOG.warn(_("update_ports() failed."))
+ return
+
+
+class NECAgentRpcCallback(object):
+
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, context, agent, sg_agent):
+ self.context = context
+ self.agent = agent
+ self.sg_agent = sg_agent
+
+ def port_update(self, context, **kwargs):
+ LOG.debug(_("port_update received: %s"), kwargs)
+ port = kwargs.get('port')
+ # Validate that port is on OVS
+ vif_port = self.agent.int_br.get_vif_port_by_id(port['id'])
+ if not vif_port:
+ return
+
+ if ext_sg.SECURITYGROUPS in port:
+ self.sg_agent.refresh_firewall()
+
+
+class SecurityGroupServerRpcApi(proxy.RpcProxy,
+ sg_rpc.SecurityGroupServerRpcApiMixin):
+
+ def __init__(self, topic):
+ super(SecurityGroupServerRpcApi, self).__init__(
+ topic=topic, default_version=sg_rpc.SG_RPC_VERSION)
+
+
+class SecurityGroupAgentRpcCallback(
+ sg_rpc.SecurityGroupAgentRpcCallbackMixin):
+
+ RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
+
+ def __init__(self, context, sg_agent):
+ self.context = context
+ self.sg_agent = sg_agent
+
+
+class SecurityGroupAgentRpc(sg_rpc.SecurityGroupAgentRpcMixin):
+
+ def __init__(self, context):
+ self.context = context
+ self.plugin_rpc = SecurityGroupServerRpcApi(topics.PLUGIN)
+ self.init_firewall()
+
+
class NECQuantumAgent(object):
def __init__(self, integ_br, root_helper, polling_interval):
self.int_br = ovs_lib.OVSBridge(integ_br, root_helper)
self.polling_interval = polling_interval
+ self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
+ self.setup_rpc()
+
+ def setup_rpc(self):
self.host = socket.gethostname()
self.agent_id = 'nec-q-agent.%s' % self.host
- self.datapath_id = "0x%s" % self.int_br.get_datapath_id()
+ LOG.info(_("RPC agent_id: %s"), self.agent_id)
- # RPC network init
- self.context = context.get_admin_context_without_session()
- self.conn = rpc.create_connection(new=True)
+ self.topic = topics.AGENT
+ self.context = q_context.get_admin_context_without_session()
- def update_ports(self, port_added=[], port_removed=[]):
- """RPC to update information of ports on Quantum Server"""
- LOG.info(_("Update ports: added=%(port_added)s, "
- "removed=%(port_removed)s"),
- locals())
- try:
- rpc.call(self.context,
- topics.PLUGIN,
- {'method': 'update_ports',
- 'args': {'topic': topics.AGENT,
- 'agent_id': self.agent_id,
- 'datapath_id': self.datapath_id,
- 'port_added': port_added,
- 'port_removed': port_removed}})
- except Exception as e:
- LOG.warn(_("update_ports() failed."))
- return
+ self.plugin_rpc = NECPluginApi(topics.PLUGIN)
+ self.sg_agent = SecurityGroupAgentRpc(self.context)
+
+ # RPC network init
+ # Handle updates from service
+ self.callback_nec = NECAgentRpcCallback(self.context,
+ self, self.sg_agent)
+ self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
+ self.sg_agent)
+ self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
+ self.callback_sg])
+ # Define the listening consumer for the agent
+ consumers = [[topics.PORT, topics.UPDATE],
+ [topics.SECURITY_GROUP, topics.UPDATE]]
+ self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.topic,
+ consumers)
def _vif_port_to_port_info(self, vif_port):
return dict(id=vif_port.vif_id, port_no=vif_port.ofport,
mac=vif_port.vif_mac)
+ def _process_security_group(self, port_added, port_removed):
+ if port_added:
+ devices_added = [p['id'] for p in port_added]
+ self.sg_agent.prepare_devices_filter(devices_added)
+ if port_removed:
+ self.sg_agent.remove_devices_filter(port_removed)
+
def daemon_loop(self):
"""Main processing loop for NEC Plugin Agent."""
old_ports = []
port_removed.append(port_id)
if port_added or port_removed:
- self.update_ports(port_added, port_removed)
+ self.plugin_rpc.update_ports(self.context,
+ self.agent_id, self.datapath_id,
+ port_added, port_removed)
+ self._process_security_group(port_added, port_removed)
else:
LOG.debug(_("No port changed."))
def main():
+ eventlet.monkey_patch()
+
config.CONF(project='quantum')
logging_config.setup_logging(config.CONF)
# under the License.
# @author: Ryota MIBU
+from quantum.agent import securitygroups_rpc as sg_rpc
+from quantum.common import constants as q_const
+from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum import context
from quantum.db import l3_rpc_base
#NOTE(amotoki): quota_db cannot be removed, it is for db model
from quantum.db import quota_db
+from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
+from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
from quantum.plugins.nec.common import config
from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb
ERROR = "ERROR"
-class NECPluginV2(nec_plugin_base.NECPluginV2Base, l3_db.L3_NAT_db_mixin):
+class NECPluginV2(nec_plugin_base.NECPluginV2Base,
+ l3_db.L3_NAT_db_mixin,
+ sg_db_rpc.SecurityGroupServerRpcMixin):
"""NECPluginV2 controls an OpenFlow Controller.
The Quantum NECPluginV2 maps L2 logical networks to L2 virtualized networks
information to and from the plugin.
"""
- supported_extension_aliases = ["router", "quotas", "binding"]
+ supported_extension_aliases = ["router", "quotas", "binding",
+ "security-group"]
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
self.setup_rpc()
- def _check_view_auth(self, context, resource, action):
- return policy.check(context, action, resource)
-
- def _enforce_set_auth(self, context, resource, action):
- policy.enforce(context, action, resource)
-
def setup_rpc(self):
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
- self.callbacks = NECPluginV2RPCCallbacks(self)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
+
+ self.callback_nec = NECPluginV2RPCCallbacks(self)
+ self.callback_dhcp = DhcpRpcCallback()
+ self.callback_l3 = L3RpcCallback()
+ self.callback_sg = SecurityGroupServerRpcCallback()
+ callbacks = [self.callback_nec, self.callback_dhcp,
+ self.callback_l3, self.callback_sg]
+ self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
+ def _check_view_auth(self, context, resource, action):
+ return policy.check(context, action, resource)
+
+ def _enforce_set_auth(self, context, resource, action):
+ policy.enforce(context, action, resource)
+
def _update_resource_status(self, context, resource, id, status):
"""Update status of specified resource."""
request = {}
"""Create a new network entry on DB, and create it on OFC."""
LOG.debug(_("NECPluginV2.create_network() called, "
"network=%s ."), network)
- session = context.session
- with session.begin(subtransactions=True):
+ #set up default security groups
+ tenant_id = self._get_tenant_id_for_create(
+ context, network['network'])
+ self._ensure_default_security_group(context, tenant_id)
+
+ with context.session.begin(subtransactions=True):
new_net = super(NECPluginV2, self).create_network(context, network)
self._process_l3_create(context, network['network'], new_net['id'])
self._extend_network_dict_l3(context, new_net)
def create_port(self, context, port):
"""Create a new port entry on DB, then try to activate it."""
LOG.debug(_("NECPluginV2.create_port() called, port=%s ."), port)
- new_port = super(NECPluginV2, self).create_port(context, port)
- self._update_resource_status(context, "port", new_port['id'],
- OperationalStatus.BUILD)
+ with context.session.begin(subtransactions=True):
+ self._ensure_default_security_group_on_port(context, port)
+ sgids = self._get_security_groups_on_port(context, port)
+ port = super(NECPluginV2, self).create_port(context, port)
+ self._process_port_create_security_group(
+ context, port['id'], sgids)
+ self._extend_port_dict_security_group(context, port)
+ # Note: In order to allow dhcp packets,
+ # changes for dhcp ip should be notifified
+ if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
+ self.notifier.security_groups_provider_updated(context)
+ else:
+ self.notifier.security_groups_member_updated(
+ context, port.get(ext_sg.SECURITYGROUPS))
- self.activate_port_if_ready(context, new_port)
- return self._extend_port_dict_binding(context, new_port)
+ self._update_resource_status(context, "port", port['id'],
+ OperationalStatus.BUILD)
+ self.activate_port_if_ready(context, port)
+ return self._extend_port_dict_binding(context, port)
def update_port(self, context, id, port):
"""Update port, and handle packetfilters associated with the port.
"""
LOG.debug(_("NECPluginV2.update_port() called, "
"id=%(id)s port=%(port)s ."), locals())
- old_port = super(NECPluginV2, self).get_port(context, id)
- new_port = super(NECPluginV2, self).update_port(context, id, port)
-
- changed = (old_port['admin_state_up'] is
- not new_port['admin_state_up'])
+ need_port_update_notify = False
+ with context.session.begin(subtransactions=True):
+ old_port = super(NECPluginV2, self).get_port(context, id)
+ new_port = super(NECPluginV2, self).update_port(context, id, port)
+ need_port_update_notify = self.update_security_group_on_port(
+ context, id, port, old_port, new_port)
+
+ need_port_update_notify |= self.is_security_group_member_updated(
+ context, old_port, new_port)
+ if need_port_update_notify:
+ self.notifier.port_update(context, new_port)
+
+ changed = (old_port['admin_state_up'] != new_port['admin_state_up'])
if changed:
if new_port['admin_state_up']:
self.activate_port_if_ready(context, new_port)
else:
self.deactivate_port(context, old_port)
+ # NOTE: _extend_port_dict_security_group() is called in
+ # update_security_group_on_port() above, so we don't need to
+ # call it here.
return self._extend_port_dict_binding(context, new_port)
def delete_port(self, context, id, l3_port_check=True):
"""Delete port and packet_filters associated with the port."""
LOG.debug(_("NECPluginV2.delete_port() called, id=%s ."), id)
- port = super(NECPluginV2, self).get_port(context, id)
+ # ext_sg.SECURITYGROUPS attribute for the port is required
+ # since notifier.security_groups_member_updated() need the attribute.
+ # Thus we need to call self.get_port() instead of super().get_port()
+ port = self.get_port(context, id)
self.deactivate_port(context, port)
# and l3-router. If so, we should prevent deletion.
if l3_port_check:
self.prevent_l3_port_deletion(context, id)
- self.disassociate_floatingips(context, id)
- super(NECPluginV2, self).delete_port(context, id)
+ with context.session.begin(subtransactions=True):
+ self.disassociate_floatingips(context, id)
+ self._delete_port_security_group_bindings(context, id)
+ super(NECPluginV2, self).delete_port(context, id)
+ self.notifier.security_groups_member_updated(
+ context, port.get(ext_sg.SECURITYGROUPS))
def get_port(self, context, id, fields=None):
- session = context.session
- with session.begin(subtransactions=True):
+ with context.session.begin(subtransactions=True):
port = super(NECPluginV2, self).get_port(context, id, fields)
+ self._extend_port_dict_security_group(context, port)
self._extend_port_dict_binding(context, port)
return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None):
- session = context.session
- with session.begin(subtransactions=True):
+ with context.session.begin(subtransactions=True):
ports = super(NECPluginV2, self).get_ports(context, filters,
fields)
+ # TODO(amotoki) filter by security group
for port in ports:
+ self._extend_port_dict_security_group(context, port)
self._extend_port_dict_binding(context, port)
return [self._fields(port, fields) for port in ports]
super(NECPluginV2, self).delete_packet_filter(context, id)
-class NECPluginV2RPCCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
- l3_rpc_base.L3RpcCallbackMixin):
+class NECPluginV2AgentNotifierApi(proxy.RpcProxy,
+ sg_rpc.SecurityGroupAgentRpcApiMixin):
+ '''RPC API for NEC plugin agent'''
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic):
+ super(NECPluginV2AgentNotifierApi, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ self.topic_port_update = topics.get_topic_name(
+ topic, topics.PORT, topics.UPDATE)
+
+ def port_update(self, context, port):
+ self.fanout_cast(context,
+ self.make_msg('port_update',
+ port=port),
+ topic=self.topic_port_update)
+
+
+class DhcpRpcCallback(dhcp_rpc_base.DhcpRpcCallbackMixin):
+ # DhcpPluginApi BASE_RPC_API_VERSION
+ RPC_API_VERSION = '1.0'
+
+
+class L3RpcCallback(l3_rpc_base.L3RpcCallbackMixin):
+ # L3PluginApi BASE_RPC_API_VERSION
+ RPC_API_VERSION = '1.0'
+
+
+class SecurityGroupServerRpcCallback(
+ sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+
+ RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
+
+ @staticmethod
+ def get_port_from_device(device):
+ port = ndb.get_port_from_device(device)
+ if port:
+ port['device'] = device
+ LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, "
+ "device=%(device)s => %(ret)s."),
+ {'device': device, 'ret': port})
+ return port
+
+
+class NECPluginV2RPCCallbacks(object):
RPC_API_VERSION = '1.0'
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013, NEC Corporation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+
+import mock
+
+from quantum.api.v2 import attributes
+from quantum.extensions import securitygroup as ext_sg
+from quantum import manager
+from quantum.plugins.nec.db import api as ndb
+from quantum.tests.unit import test_extension_security_group as test_sg
+from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
+
+PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2')
+AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent')
+NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi')
+
+
+class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
+ _plugin_name = PLUGIN_NAME
+
+ def setUp(self, plugin=None):
+ self.addCleanup(mock.patch.stopall)
+ notifier_p = mock.patch(NOTIFIER)
+ notifier_cls = notifier_p.start()
+ self.notifier = mock.Mock()
+ notifier_cls.return_value = self.notifier
+ self._attribute_map_bk_ = {}
+ for item in attributes.RESOURCE_ATTRIBUTE_MAP:
+ self._attribute_map_bk_[item] = (attributes.
+ RESOURCE_ATTRIBUTE_MAP[item].
+ copy())
+ super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
+
+ def tearDown(self):
+ super(NecSecurityGroupsTestCase, self).tearDown()
+ attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
+
+
+class TestNecSecurityGroups(NecSecurityGroupsTestCase,
+ test_sg.TestSecurityGroups,
+ test_sg_rpc.SGNotificationTestMixin):
+
+ def test_security_group_get_port_from_device(self):
+ with contextlib.nested(self.network(),
+ self.security_group()) as (n, sg):
+ with self.subnet(n):
+ res = self._create_port(self.fmt, n['network']['id'])
+ port = self.deserialize(self.fmt, res)
+ port_id = port['port']['id']
+ sg_id = sg['security_group']['id']
+ fixed_ips = port['port']['fixed_ips']
+
+ data = {'port': {'fixed_ips': fixed_ips,
+ 'name': port['port']['name'],
+ ext_sg.SECURITYGROUPS: [sg_id]}}
+ req = self.new_update_request('ports', data, port_id)
+ res = self.deserialize(self.fmt,
+ req.get_response(self.api))
+
+ plugin = manager.QuantumManager.get_plugin()
+ port_dict = plugin.callback_sg.get_port_from_device(port_id)
+ self.assertEqual(port_id, port_dict['id'])
+ self.assertEqual([sg_id],
+ port_dict[ext_sg.SECURITYGROUPS])
+ self.assertEqual([], port_dict['security_group_rules'])
+ self.assertEqual([fixed_ips[0]['ip_address']],
+ port_dict['fixed_ips'])
+ self._delete('ports', port_id)
+
+
+class TestNecSecurityGroupsXML(TestNecSecurityGroups):
+ fmt = 'xml'