From: Sylvain Afchain Date: Mon, 22 Jul 2013 12:45:34 +0000 (+0200) Subject: Add l2 population base classes X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=95bf8e6a4068508bed63574b36d90fdecb289969;p=openstack-build%2Fneutron-build.git Add l2 population base classes This patch initiates the blueprint l2-population Implemented as a ml2 Mechanism driver. OVS & LinuxBridge drivers will be added as dependencies. Rebased on ML2 Portbinding. Change-Id: Ia2345aa262ec791c9f38b6e41e1e4b46f69cadac --- diff --git a/neutron/agent/l2population_rpc.py b/neutron/agent/l2population_rpc.py new file mode 100644 index 000000000..c9d131aaf --- /dev/null +++ b/neutron/agent/l2population_rpc.py @@ -0,0 +1,46 @@ +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange + +import abc + +from oslo.config import cfg + +from neutron.common import log + + +class L2populationRpcCallBackMixin(object): + __metaclass__ = abc.ABCMeta + + @log.log + def add_fdb_entries(self, context, fdb_entries, host=None): + if not host or host == cfg.CONF.host: + self.fdb_add(context, fdb_entries) + + @log.log + def remove_fdb_entries(self, context, fdb_entries, host=None): + if not host or host == cfg.CONF.host: + self.fdb_remove(context, fdb_entries) + + @abc.abstractmethod + def fdb_add(self, context, fdb_entries): + pass + + @abc.abstractmethod + def fdb_remove(self, context, fdb_entries): + pass diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 30e10ca56..16f1cfb02 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -15,6 +15,8 @@ # License for the specific language governing permissions and limitations # under the License. +import itertools + from neutron.common import topics from neutron.openstack.common import log as logging @@ -31,16 +33,25 @@ def create_consumers(dispatcher, prefix, topic_details): :param dispatcher: The dispatcher to process the incoming messages. :param prefix: Common prefix for the plugin/agent message queues. - :param topic_details: A list of topics. Each topic has a name and a - operation. + :param topic_details: A list of topics. Each topic has a name, an + operation, and an optional host param keying the + subscription to topic.host for plugin calls. :returns: A common Connection. """ connection = rpc.create_connection(new=True) - for topic, operation in topic_details: + for details in topic_details: + topic, operation, node_name = itertools.islice( + itertools.chain(details, [None]), 3) + topic_name = topics.get_topic_name(prefix, topic, operation) connection.create_consumer(topic_name, dispatcher, fanout=True) + if node_name: + node_topic_name = '%s.%s' % (topic_name, node_name) + connection.create_consumer(node_topic_name, + dispatcher, + fanout=False) connection.consume_in_thread() return connection diff --git a/neutron/common/constants.py b/neutron/common/constants.py index 6454915e1..03af7373c 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -44,6 +44,7 @@ DHCP_RESPONSE_PORT = 68 MIN_VLAN_TAG = 1 MAX_VLAN_TAG = 4094 +FLOODING_ENTRY = ['00:00:00:00:00:00', '0.0.0.0'] EXT_NS_COMP = '_backward_comp_e_ns' EXT_NS = '_extension_ns' diff --git a/neutron/common/topics.py b/neutron/common/topics.py index df9794faf..058383eff 100644 --- a/neutron/common/topics.py +++ b/neutron/common/topics.py @@ -17,6 +17,7 @@ NETWORK = 'network' SUBNET = 'subnet' PORT = 'port' SECURITY_GROUP = 'security_group' +L2POPULATION = 'l2population' CREATE = 'create' DELETE = 'delete' @@ -34,7 +35,7 @@ DHCP_AGENT = 'dhcp_agent' METERING_AGENT = 'metering_agent' -def get_topic_name(prefix, table, operation): +def get_topic_name(prefix, table, operation, host=None): """Create a topic name. The topic name needs to be synced between the agent and the @@ -46,6 +47,9 @@ def get_topic_name(prefix, table, operation): :param table: The table in question (NETWORK, SUBNET, PORT). :param operation: The operation that invokes notification (CREATE, DELETE, UPDATE) + :param host: Add host to the topic :returns: The topic name. """ + if host: + return '%s-%s-%s.%s' % (prefix, table, operation, host) return '%s-%s-%s' % (prefix, table, operation) diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index 79c0f92f5..f045f757d 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -601,6 +601,11 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): details['physical_network'], segmentation_id, details['port_id']) + + # update plugin about port status + self.plugin_rpc.update_device_up(self.context, + device, + self.agent_id) else: self.remove_port_binding(details['network_id'], details['port_id']) diff --git a/neutron/plugins/ml2/drivers/l2pop/__init__.py b/neutron/plugins/ml2/drivers/l2pop/__init__.py new file mode 100644 index 000000000..b9b2306f9 --- /dev/null +++ b/neutron/plugins/ml2/drivers/l2pop/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange diff --git a/neutron/plugins/ml2/drivers/l2pop/config.py b/neutron/plugins/ml2/drivers/l2pop/config.py new file mode 100644 index 000000000..1e0701e0b --- /dev/null +++ b/neutron/plugins/ml2/drivers/l2pop/config.py @@ -0,0 +1,29 @@ +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange + +from oslo.config import cfg + + +l2_population_options = [ + cfg.IntOpt('agent_boot_time', default=180, + help=_('Delay within which agent is expected to update ' + 'existing ports whent it restarts')), +] + +cfg.CONF.register_opts(l2_population_options, "l2pop") diff --git a/neutron/plugins/ml2/drivers/l2pop/constants.py b/neutron/plugins/ml2/drivers/l2pop/constants.py new file mode 100644 index 000000000..74ca3a1ab --- /dev/null +++ b/neutron/plugins/ml2/drivers/l2pop/constants.py @@ -0,0 +1,20 @@ +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange + +SUPPORTED_AGENT_TYPES = [] diff --git a/neutron/plugins/ml2/drivers/l2pop/db.py b/neutron/plugins/ml2/drivers/l2pop/db.py new file mode 100644 index 000000000..b176a396b --- /dev/null +++ b/neutron/plugins/ml2/drivers/l2pop/db.py @@ -0,0 +1,76 @@ +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange + +from neutron.db import agents_db +from neutron.db import db_base_plugin_v2 as base_db +from neutron.db import models_v2 +from neutron.openstack.common import jsonutils +from neutron.openstack.common import timeutils +from neutron.plugins.ml2.drivers.l2pop import constants as l2_const +from neutron.plugins.ml2 import models as ml2_models + + +class L2populationDbMixin(base_db.CommonDbMixin): + + def get_agent_ip_by_host(self, session, agent_host): + agent = self.get_agent_by_host(session, agent_host) + if agent: + return self.get_agent_ip(agent) + + def get_agent_ip(self, agent): + configuration = jsonutils.loads(agent.configurations) + return configuration.get('tunneling_ip') + + def get_agent_uptime(self, agent): + return timeutils.delta_seconds(agent.started_at, + agent.heartbeat_timestamp) + + def get_agent_tunnel_types(self, agent): + configuration = jsonutils.loads(agent.configurations) + return configuration.get('tunnel_types') + + def get_agent_by_host(self, session, agent_host): + with session.begin(subtransactions=True): + query = session.query(agents_db.Agent) + query = query.filter(agents_db.Agent.host == agent_host, + agents_db.Agent.agent_type.in_( + l2_const.SUPPORTED_AGENT_TYPES)) + return query.first() + + def get_network_ports(self, session, network_id): + with session.begin(subtransactions=True): + query = session.query(ml2_models.PortBinding, + agents_db.Agent) + query = query.join(agents_db.Agent, + agents_db.Agent.host == + ml2_models.PortBinding.host) + query = query.filter(models_v2.Port.network_id == network_id, + models_v2.Port.admin_state_up == True, + agents_db.Agent.agent_type.in_( + l2_const.SUPPORTED_AGENT_TYPES)) + return query + + def get_agent_network_port_count(self, session, agent_host, network_id): + with session.begin(subtransactions=True): + query = session.query(models_v2.Port) + + query = query.join(ml2_models.PortBinding) + query = query.filter(models_v2.Port.network_id == network_id, + ml2_models.PortBinding.host == agent_host) + return query.count() diff --git a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py new file mode 100644 index 000000000..8ab92da25 --- /dev/null +++ b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py @@ -0,0 +1,198 @@ +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange + +from oslo.config import cfg + +from neutron.common import constants as const +from neutron import context as n_context +from neutron.db import api as db_api +from neutron.openstack.common import log as logging +from neutron.plugins.ml2 import driver_api as api +from neutron.plugins.ml2.drivers.l2pop import config # noqa +from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db +from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc + +LOG = logging.getLogger(__name__) + + +class L2populationMechanismDriver(api.MechanismDriver, + l2pop_db.L2populationDbMixin): + + def initialize(self): + LOG.debug(_("Experimental L2 population driver")) + + def _get_port_fdb_entries(self, port): + return [[port['mac_address'], + ip['ip_address']] for ip in port['fixed_ips']] + + def delete_port_precommit(self, context): + self.remove_fdb_entries = self._update_port_down(context) + + def delete_port_postcommit(self, context): + self._notify_remove_fdb_entries(context, + self.remove_fdb_entries) + + def _notify_remove_fdb_entries(self, context, fdb_entries): + rpc_ctx = n_context.get_admin_context_without_session() + l2pop_rpc.L2populationAgentNotify.remove_fdb_entries( + rpc_ctx, fdb_entries) + + def update_port_postcommit(self, context): + port = context.current + orig = context.original + + if port['status'] == orig['status']: + return + + if port['status'] == const.PORT_STATUS_ACTIVE: + self._update_port_up(context) + elif port['status'] == const.PORT_STATUS_DOWN: + fdb_entries = self._update_port_down(context) + self._notify_remove_fdb_entries(context, fdb_entries) + + def _update_port_up(self, context): + port_context = context.current + network_id = port_context['network_id'] + agent_host = port_context['binding:host_id'] + if not agent_host: + return + + session = db_api.get_session() + agent = self.get_agent_by_host(session, agent_host) + if not agent: + return + + agent_ip = self.get_agent_ip(agent) + if not agent_ip: + LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"), + agent_host) + return + + segment = context.bound_segment + if not segment: + LOG.warning(_("Port %(port)s updated by agent %(agent)s " + "isn't bound to any segment"), + {'port': port_context['id'], 'agent': agent.host}) + return + + tunnel_types = self.get_agent_tunnel_types(agent) + if segment['network_type'] not in tunnel_types: + return + + agent_ports = self.get_agent_network_port_count(session, agent_host, + network_id) + + rpc_ctx = n_context.get_admin_context_without_session() + + other_fdb_entries = {network_id: + {'segment_id': segment['segmentation_id'], + 'network_type': segment['network_type'], + 'ports': {agent_ip: []}}} + + if agent_ports == 1 or ( + self.get_agent_uptime(agent) < cfg.CONF.l2pop.agent_boot_time): + # First port plugged on current agent in this network, + # we have to provide it with the whole list of fdb entries + agent_fdb_entries = {network_id: + {'segment_id': segment['segmentation_id'], + 'network_type': segment['network_type'], + 'ports': {}}} + ports = agent_fdb_entries[network_id]['ports'] + + network_ports = self.get_network_ports(session, network_id) + for network_port in network_ports: + binding, agent = network_port + if agent.host == agent_host: + continue + + ip = self.get_agent_ip(agent) + if not ip: + LOG.debug(_("Unable to retrieve the agent ip, check " + "the agent %(agent_host)s configuration."), + {'agent_host': agent.host}) + continue + + agent_ports = ports.get(ip, [const.FLOODING_ENTRY]) + agent_ports += self._get_port_fdb_entries(binding.port) + ports[ip] = agent_ports + + # And notify other agents to add flooding entry + other_fdb_entries[network_id]['ports'][agent_ip].append( + const.FLOODING_ENTRY) + + if ports.keys(): + l2pop_rpc.L2populationAgentNotify.add_fdb_entries( + rpc_ctx, agent_fdb_entries, agent_host) + + # Notify other agents to add fdb rule for current port + fdb_entries = self._get_port_fdb_entries(port_context) + other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries + + l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx, + other_fdb_entries) + + def _update_port_down(self, context): + port_context = context.current + network_id = port_context['network_id'] + + agent_host = port_context['binding:host_id'] + if not agent_host: + return + + session = db_api.get_session() + agent = self.get_agent_by_host(session, agent_host) + if not agent: + return + + agent_ip = self.get_agent_ip(agent) + if not agent_ip: + LOG.warning(_("Unable to retrieve the agent ip, check the agent " + "configuration.")) + return + + segment = context.bound_segment + if not segment: + LOG.warning(_("Port %(port)s updated by agent %(agent)s " + "isn't bound to any segment"), + {'port': port_context['id'], 'agent': agent}) + return + + tunnel_types = self.get_agent_tunnel_types(agent) + if segment['network_type'] not in tunnel_types: + return + + agent_ports = self.get_agent_network_port_count(session, agent_host, + network_id) + + other_fdb_entries = {network_id: + {'segment_id': segment['segmentation_id'], + 'network_type': segment['network_type'], + 'ports': {agent_ip: []}}} + + if agent_ports == 1: + # Agent is removing its last port in this network, + # other agents needs to be notified to delete their flooding entry. + other_fdb_entries[network_id]['ports'][agent_ip].append( + const.FLOODING_ENTRY) + + # Notify other agents to remove fdb rule for current port + fdb_entries = self._get_port_fdb_entries(port_context) + other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries + + return other_fdb_entries diff --git a/neutron/plugins/ml2/drivers/l2pop/rpc.py b/neutron/plugins/ml2/drivers/l2pop/rpc.py new file mode 100644 index 000000000..176f11820 --- /dev/null +++ b/neutron/plugins/ml2/drivers/l2pop/rpc.py @@ -0,0 +1,79 @@ +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange + +from neutron.common import topics +from neutron.openstack.common import log as logging +from neutron.openstack.common.rpc import proxy + + +LOG = logging.getLogger(__name__) + + +class L2populationAgentNotifyAPI(proxy.RpcProxy): + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic=topics.AGENT): + super(L2populationAgentNotifyAPI, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + + self.topic_l2pop_update = topics.get_topic_name(topic, + topics.L2POPULATION, + topics.UPDATE) + + def _notification_fanout(self, context, method, fdb_entries): + LOG.debug(_('Fanout notify l2population agents at %(topic)s ' + 'the message %(method)s with %(fdb_entries)s'), + {'topic': self.topic, + 'method': method, + 'fdb_entries': fdb_entries}) + + self.fanout_cast(context, + self.make_msg(method, fdb_entries=fdb_entries), + topic=self.topic_l2pop_update) + + def _notification_host(self, context, method, fdb_entries, host): + LOG.debug(_('Notify l2population agent %(host)s at %(topic)s the ' + 'message %(method)s with %(fdb_entries)s'), + {'host': host, + 'topic': self.topic, + 'method': method, + 'fdb_entries': fdb_entries}) + self.cast(context, + self.make_msg(method, fdb_entries=fdb_entries), + topic='%s.%s' % (self.topic_l2pop_update, host)) + + def add_fdb_entries(self, context, fdb_entries, host=None): + if fdb_entries: + if host: + self._notification_host(context, 'add_fdb_entries', + fdb_entries, host) + else: + self._notification_fanout(context, 'add_fdb_entries', + fdb_entries) + + def remove_fdb_entries(self, context, fdb_entries, host=None): + if fdb_entries: + if host: + self._notification_host(context, 'remove_fdb_entries', + fdb_entries, host) + else: + self._notification_fanout(context, 'remove_fdb_entries', + fdb_entries) + +L2populationAgentNotify = L2populationAgentNotifyAPI() diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 25de47f33..1d456f6ef 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -519,8 +519,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, network = self.get_network(context, port['network_id']) mech_context = driver_context.PortContext(self, context, port, network) - self._delete_port_binding(mech_context) self.mechanism_manager.delete_port_precommit(mech_context) + self._delete_port_binding(mech_context) self._delete_port_security_group_bindings(context, id) super(Ml2Plugin, self).delete_port(context, id) @@ -532,3 +532,30 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # fact that an error occurred. pass self.notify_security_groups_member_updated(context, port) + + def update_port_status(self, context, port_id, status): + updated = False + session = context.session + with session.begin(subtransactions=True): + port = db.get_port(session, port_id) + if not port: + LOG.warning(_("Port %(port)s updated up by agent not found"), + {'port': port_id}) + return False + + if port.status != status: + original_port = self._make_port_dict(port) + port.status = status + updated_port = self._make_port_dict(port) + network = self.get_network(context, + original_port['network_id']) + mech_context = driver_context.PortContext( + self, context, updated_port, network, + original_port=original_port) + self.mechanism_manager.update_port_precommit(mech_context) + updated = True + + if updated: + self.mechanism_manager.update_port_postcommit(mech_context) + + return True diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 6ad453a6d..4ead0e339 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -21,6 +21,7 @@ from neutron.db import agents_db from neutron.db import api as db_api from neutron.db import dhcp_rpc_base from neutron.db import securitygroups_rpc_base as sg_db_rpc +from neutron import manager from neutron.openstack.common import log from neutron.openstack.common.rpc import proxy from neutron.plugins.ml2 import db @@ -128,7 +129,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, 'vif_type': binding.vif_type}) return {'device': device} - new_status = (q_const.PORT_STATUS_ACTIVE if port.admin_state_up + new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up else q_const.PORT_STATUS_DOWN) if port.status != new_status: port.status = new_status @@ -157,19 +158,12 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, {'device': device, 'agent_id': agent_id}) port_id = self._device_to_port_id(device) - session = db_api.get_session() - with session.begin(subtransactions=True): - port = db.get_port(session, port_id) - if not port: - LOG.warning(_("Device %(device)s updated down by agent " - "%(agent_id)s not found in database"), - {'device': device, 'agent_id': agent_id}) - return {'device': device, - 'exists': False} - if port.status != q_const.PORT_STATUS_DOWN: - port.status = q_const.PORT_STATUS_DOWN - return {'device': device, - 'exists': True} + plugin = manager.NeutronManager.get_plugin() + port_exists = plugin.update_port_status(rpc_context, port_id, + q_const.PORT_STATUS_DOWN) + + return {'device': device, + 'exists': port_exists} def update_device_up(self, rpc_context, **kwargs): """Device is up on agent.""" @@ -179,15 +173,9 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, {'device': device, 'agent_id': agent_id}) port_id = self._device_to_port_id(device) - session = db_api.get_session() - with session.begin(subtransactions=True): - port = db.get_port(session, port_id) - if not port: - LOG.warning(_("Device %(device)s updated up by agent " - "%(agent_id)s not found in database"), - {'device': device, 'agent_id': agent_id}) - if port.status != q_const.PORT_STATUS_ACTIVE: - port.status = q_const.PORT_STATUS_ACTIVE + plugin = manager.NeutronManager.get_plugin() + plugin.update_port_status(rpc_context, port_id, + q_const.PORT_STATUS_ACTIVE) class AgentNotifierApi(proxy.RpcProxy, diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 3a2435ee5..5a5182dc7 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -782,6 +782,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): details['physical_network'], details['segmentation_id'], details['admin_state_up']) + + # update plugin about port status + self.plugin_rpc.update_device_up(self.context, + device, + self.agent_id) else: LOG.debug(_("Device %s not defined on plugin"), device) if (port and int(port.ofport) != -1): @@ -801,6 +806,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): {'device': device, 'e': e}) resync = True continue + + # update plugin about port status + self.plugin_rpc.update_device_up(self.context, + device, + self.agent_id) return resync def treat_devices_removed(self, devices): diff --git a/neutron/tests/unit/ml2/drivers/test_l2population.py b/neutron/tests/unit/ml2/drivers/test_l2population.py new file mode 100644 index 000000000..718926a6e --- /dev/null +++ b/neutron/tests/unit/ml2/drivers/test_l2population.py @@ -0,0 +1,408 @@ +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Sylvain Afchain, eNovance SAS +# @author: Francois Eleouet, Orange +# @author: Mathieu Rohon, Orange + +import mock + +from neutron.common import constants +from neutron.common import topics +from neutron import context +from neutron.db import agents_db +from neutron.db import api as db_api +from neutron.extensions import portbindings +from neutron.extensions import providernet as pnet +from neutron.openstack.common import timeutils +from neutron.plugins.ml2 import config as config +from neutron.plugins.ml2.drivers.l2pop import constants as l2_consts +from neutron.plugins.ml2 import managers +from neutron.plugins.ml2 import rpc +from neutron.tests.unit import test_db_plugin as test_plugin + +HOST = 'my_l2_host' +L2_AGENT = { + 'binary': 'neutron-openvswitch-agent', + 'host': HOST, + 'topic': constants.L2_AGENT_TOPIC, + 'configurations': {'tunneling_ip': '20.0.0.1', + 'tunnel_types': ['vxlan']}, + 'agent_type': constants.AGENT_TYPE_OVS, + 'tunnel_type': [], + 'start_flag': True +} + +L2_AGENT_2 = { + 'binary': 'neutron-openvswitch-agent', + 'host': HOST + '_2', + 'topic': constants.L2_AGENT_TOPIC, + 'configurations': {'tunneling_ip': '20.0.0.2', + 'tunnel_types': ['vxlan']}, + 'agent_type': constants.AGENT_TYPE_OVS, + 'tunnel_type': [], + 'start_flag': True +} + +L2_AGENT_3 = { + 'binary': 'neutron-openvswitch-agent', + 'host': HOST + '_3', + 'topic': constants.L2_AGENT_TOPIC, + 'configurations': {'tunneling_ip': '20.0.0.2', + 'tunnel_types': []}, + 'agent_type': constants.AGENT_TYPE_OVS, + 'tunnel_type': [], + 'start_flag': True +} + +PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin' +NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi' + + +class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase): + + def setUp(self): + # Enable the test mechanism driver to ensure that + # we can successfully call through to all mechanism + # driver apis. + config.cfg.CONF.set_override('mechanism_drivers', + ['openvswitch', 'linuxbridge', + 'l2population'], + 'ml2') + super(TestL2PopulationRpcTestCase, self).setUp(PLUGIN_NAME) + self.addCleanup(config.cfg.CONF.reset) + self.port_create_status = 'DOWN' + + self.adminContext = context.get_admin_context() + + self.type_manager = managers.TypeManager() + self.notifier = rpc.AgentNotifierApi(topics.AGENT) + self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager) + + self.orig_supported_agents = l2_consts.SUPPORTED_AGENT_TYPES + l2_consts.SUPPORTED_AGENT_TYPES = [constants.AGENT_TYPE_OVS] + + net_arg = {pnet.NETWORK_TYPE: 'vxlan', + pnet.SEGMENTATION_ID: '1'} + self._network = self._make_network(self.fmt, 'net1', True, + arg_list=(pnet.NETWORK_TYPE, + pnet.SEGMENTATION_ID,), + **net_arg) + + notifier_patch = mock.patch(NOTIFIER) + notifier_patch.start() + + self.fanout_topic = topics.get_topic_name(topics.AGENT, + topics.L2POPULATION, + topics.UPDATE) + fanout = ('neutron.openstack.common.rpc.proxy.RpcProxy.fanout_cast') + fanout_patch = mock.patch(fanout) + self.mock_fanout = fanout_patch.start() + + cast = ('neutron.openstack.common.rpc.proxy.RpcProxy.cast') + cast_patch = mock.patch(cast) + self.mock_cast = cast_patch.start() + + uptime = ('neutron.plugins.ml2.drivers.l2pop.db.L2populationDbMixin.' + 'get_agent_uptime') + uptime_patch = mock.patch(uptime, return_value=190) + uptime_patch.start() + + self.addCleanup(mock.patch.stopall) + self.addCleanup(db_api.clear_db) + + def tearDown(self): + l2_consts.SUPPORTED_AGENT_TYPES = self.orig_supported_agents + super(TestL2PopulationRpcTestCase, self).tearDown() + + def _register_ml2_agents(self): + callback = agents_db.AgentExtRpcCallback() + callback.report_state(self.adminContext, + agent_state={'agent_state': L2_AGENT}, + time=timeutils.strtime()) + callback.report_state(self.adminContext, + agent_state={'agent_state': L2_AGENT_2}, + time=timeutils.strtime()) + callback.report_state(self.adminContext, + agent_state={'agent_state': L2_AGENT_3}, + time=timeutils.strtime()) + + def test_fdb_add_called(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST} + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg): + p1 = port1['port'] + + device = 'tap' + p1['id'] + + self.mock_fanout.reset_mock() + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, + device=device) + + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + expected = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [[p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'add_fdb_entries'} + + self.mock_fanout.assert_called_with( + mock.ANY, expected, topic=self.fanout_topic) + + def test_fdb_add_not_called_type_local(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST + '_3'} + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg): + p1 = port1['port'] + + device = 'tap' + p1['id'] + + self.mock_fanout.reset_mock() + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, + device=device) + + self.assertFalse(self.mock_fanout.called) + + def test_fdb_add_two_agents(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST, + 'admin_state_up': True} + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID, 'admin_state_up',), + **host_arg) as port1: + host_arg = {portbindings.HOST_ID: HOST + '_2', + 'admin_state_up': True} + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID, + 'admin_state_up',), + **host_arg) as port2: + p1 = port1['port'] + p2 = port2['port'] + + device = 'tap' + p1['id'] + + self.mock_cast.reset_mock() + self.mock_fanout.reset_mock() + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, + device=device) + + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + p2_ips = [p['ip_address'] for p in p2['fixed_ips']] + + expected1 = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.2': [constants.FLOODING_ENTRY, + [p2['mac_address'], + p2_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'add_fdb_entries'} + + topic = topics.get_topic_name(topics.AGENT, + topics.L2POPULATION, + topics.UPDATE, + HOST) + + self.mock_cast.assert_called_with(mock.ANY, + expected1, + topic=topic) + + expected2 = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [constants.FLOODING_ENTRY, + [p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'add_fdb_entries'} + + self.mock_fanout.assert_called_with( + mock.ANY, expected2, topic=self.fanout_topic) + + def test_fdb_add_called_two_networks(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST} + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port1: + with self.subnet(cidr='10.1.0.0/24') as subnet2: + host_arg = {portbindings.HOST_ID: HOST + '_2'} + with self.port(subnet=subnet2, + arg_list=(portbindings.HOST_ID,), + **host_arg): + p1 = port1['port'] + + device = 'tap' + p1['id'] + + self.mock_fanout.reset_mock() + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, + device=device) + + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + expected = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [constants.FLOODING_ENTRY, + [p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'add_fdb_entries'} + + self.mock_fanout.assert_called_with( + mock.ANY, expected, topic=self.fanout_topic) + + def test_fdb_remove_called_from_rpc(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST} + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg): + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port: + p1 = port['port'] + + device = 'tap' + p1['id'] + + self.mock_fanout.reset_mock() + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, + device=device) + + self.callbacks.update_device_down(self.adminContext, + agent_id=HOST, + device=device) + + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + expected = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [[p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'remove_fdb_entries'} + + self.mock_fanout.assert_called_with( + mock.ANY, expected, topic=self.fanout_topic) + + def test_fdb_remove_called(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST} + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg): + + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port: + p1 = port['port'] + + device = 'tap' + p1['id'] + + self.mock_fanout.reset_mock() + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, + device=device) + + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + expected = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [[p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'remove_fdb_entries'} + + self.mock_fanout.assert_any_call( + mock.ANY, expected, topic=self.fanout_topic) + + def test_fdb_remove_called_last_port(self): + self._register_ml2_agents() + + with self.subnet(network=self._network) as subnet: + host_arg = {portbindings.HOST_ID: HOST} + + with self.port(subnet=subnet, + arg_list=(portbindings.HOST_ID,), + **host_arg) as port: + p1 = port['port'] + + device = 'tap' + p1['id'] + + self.callbacks.update_device_up(self.adminContext, + agent_id=HOST, + device=device) + + p1_ips = [p['ip_address'] for p in p1['fixed_ips']] + expected = {'args': + {'fdb_entries': + {p1['network_id']: + {'ports': + {'20.0.0.1': [constants.FLOODING_ENTRY, + [p1['mac_address'], + p1_ips[0]]]}, + 'network_type': 'vxlan', + 'segment_id': 1}}}, + 'namespace': None, + 'method': 'remove_fdb_entries'} + + self.mock_fanout.assert_any_call( + mock.ANY, expected, topic=self.fanout_topic) diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index 546c56c4d..3cdd9d9ee 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -184,8 +184,9 @@ class TestOvsNeutronAgent(base.BaseTestCase): return_value=details), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=port), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), mock.patch.object(self.agent, func_name) - ) as (get_dev_fn, get_vif_func, func): + ) as (get_dev_fn, get_vif_func, upd_dev_up, func): self.assertFalse(self.agent.treat_devices_added([{}])) return func.called diff --git a/neutron/tests/unit/test_agent_rpc.py b/neutron/tests/unit/test_agent_rpc.py index 7327b53da..35e9b75fb 100644 --- a/neutron/tests/unit/test_agent_rpc.py +++ b/neutron/tests/unit/test_agent_rpc.py @@ -95,3 +95,19 @@ class AgentRPCMethods(base.BaseTestCase): with mock.patch(call_to_patch) as create_connection: rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')]) create_connection.assert_has_calls(expected) + + def test_create_consumers_with_node_name(self): + dispatcher = mock.Mock() + expected = [ + mock.call(new=True), + mock.call().create_consumer('foo-topic-op', dispatcher, + fanout=True), + mock.call().create_consumer('foo-topic-op.node1', dispatcher, + fanout=False), + mock.call().consume_in_thread() + ] + + call_to_patch = 'neutron.openstack.common.rpc.create_connection' + with mock.patch(call_to_patch) as create_connection: + rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')]) + create_connection.assert_has_calls(expected) diff --git a/setup.cfg b/setup.cfg index 2f1ebfc2c..4876309e1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,7 +51,7 @@ data_files = etc/neutron/plugins/linuxbridge = etc/neutron/plugins/linuxbridge/linuxbridge_conf.ini etc/neutron/plugins/metaplugin = etc/neutron/plugins/metaplugin/metaplugin.ini etc/neutron/plugins/midonet = etc/neutron/plugins/midonet/midonet.ini - etc/neutron/plugins/ml2 = + etc/neutron/plugins/ml2 = etc/neutron/plugins/ml2/ml2_conf.ini etc/neutron/plugins/ml2/ml2_conf_arista.ini etc/neutron/plugins/ml2/ml2_conf_cisco.ini @@ -129,6 +129,7 @@ neutron.ml2.mechanism_drivers = ncs = neutron.plugins.ml2.drivers.mechanism_ncs:NCSMechanismDriver arista = neutron.plugins.ml2.drivers.mech_arista.mechanism_arista:AristaDriver cisco_nexus = neutron.plugins.ml2.drivers.cisco.mech_cisco_nexus:CiscoNexusMechanismDriver + l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver [build_sphinx] all_files = 1