]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add l2 population base classes
authorSylvain Afchain <sylvain.afchain@enovance.com>
Mon, 22 Jul 2013 12:45:34 +0000 (14:45 +0200)
committerFrancois Eleouet <f.eleouet@gmail.com>
Wed, 11 Sep 2013 23:06:51 +0000 (01:06 +0200)
This patch initiates the blueprint l2-population
Implemented as a ml2 Mechanism driver.

OVS & LinuxBridge drivers will be added as dependencies.

Rebased on ML2 Portbinding.

Change-Id: Ia2345aa262ec791c9f38b6e41e1e4b46f69cadac

18 files changed:
neutron/agent/l2population_rpc.py [new file with mode: 0644]
neutron/agent/rpc.py
neutron/common/constants.py
neutron/common/topics.py
neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/ml2/drivers/l2pop/__init__.py [new file with mode: 0644]
neutron/plugins/ml2/drivers/l2pop/config.py [new file with mode: 0644]
neutron/plugins/ml2/drivers/l2pop/constants.py [new file with mode: 0644]
neutron/plugins/ml2/drivers/l2pop/db.py [new file with mode: 0644]
neutron/plugins/ml2/drivers/l2pop/mech_driver.py [new file with mode: 0644]
neutron/plugins/ml2/drivers/l2pop/rpc.py [new file with mode: 0644]
neutron/plugins/ml2/plugin.py
neutron/plugins/ml2/rpc.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/ml2/drivers/test_l2population.py [new file with mode: 0644]
neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
neutron/tests/unit/test_agent_rpc.py
setup.cfg

diff --git a/neutron/agent/l2population_rpc.py b/neutron/agent/l2population_rpc.py
new file mode 100644 (file)
index 0000000..c9d131a
--- /dev/null
@@ -0,0 +1,46 @@
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+import abc
+
+from oslo.config import cfg
+
+from neutron.common import log
+
+
+class L2populationRpcCallBackMixin(object):
+    __metaclass__ = abc.ABCMeta
+
+    @log.log
+    def add_fdb_entries(self, context, fdb_entries, host=None):
+        if not host or host == cfg.CONF.host:
+            self.fdb_add(context, fdb_entries)
+
+    @log.log
+    def remove_fdb_entries(self, context, fdb_entries, host=None):
+        if not host or host == cfg.CONF.host:
+            self.fdb_remove(context, fdb_entries)
+
+    @abc.abstractmethod
+    def fdb_add(self, context, fdb_entries):
+        pass
+
+    @abc.abstractmethod
+    def fdb_remove(self, context, fdb_entries):
+        pass
index 30e10ca5665e55bd12d2384672ce9bbc1c8a7791..16f1cfb0293f0658a9404a579289e40b9fd4ca50 100644 (file)
@@ -15,6 +15,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import itertools
+
 from neutron.common import topics
 
 from neutron.openstack.common import log as logging
@@ -31,16 +33,25 @@ def create_consumers(dispatcher, prefix, topic_details):
 
     :param dispatcher: The dispatcher to process the incoming messages.
     :param prefix: Common prefix for the plugin/agent message queues.
-    :param topic_details: A list of topics. Each topic has a name and a
-                          operation.
+    :param topic_details: A list of topics. Each topic has a name, an
+                          operation, and an optional host param keying the
+                          subscription to topic.host for plugin calls.
 
     :returns: A common Connection.
     """
 
     connection = rpc.create_connection(new=True)
-    for topic, operation in topic_details:
+    for details in topic_details:
+        topic, operation, node_name = itertools.islice(
+            itertools.chain(details, [None]), 3)
+
         topic_name = topics.get_topic_name(prefix, topic, operation)
         connection.create_consumer(topic_name, dispatcher, fanout=True)
+        if node_name:
+            node_topic_name = '%s.%s' % (topic_name, node_name)
+            connection.create_consumer(node_topic_name,
+                                       dispatcher,
+                                       fanout=False)
     connection.consume_in_thread()
     return connection
 
index 6454915e18325014e24e7b1db765a36533e37f90..03af7373cf225c580d5bf4a5693532945ac1446e 100644 (file)
@@ -44,6 +44,7 @@ DHCP_RESPONSE_PORT = 68
 
 MIN_VLAN_TAG = 1
 MAX_VLAN_TAG = 4094
+FLOODING_ENTRY = ['00:00:00:00:00:00', '0.0.0.0']
 
 EXT_NS_COMP = '_backward_comp_e_ns'
 EXT_NS = '_extension_ns'
index df9794faf039f06d9d0e68bdfa8e01f1bc123f48..058383eff476df951a310774ef7af4e1d33a5b2d 100644 (file)
@@ -17,6 +17,7 @@ NETWORK = 'network'
 SUBNET = 'subnet'
 PORT = 'port'
 SECURITY_GROUP = 'security_group'
+L2POPULATION = 'l2population'
 
 CREATE = 'create'
 DELETE = 'delete'
@@ -34,7 +35,7 @@ DHCP_AGENT = 'dhcp_agent'
 METERING_AGENT = 'metering_agent'
 
 
-def get_topic_name(prefix, table, operation):
+def get_topic_name(prefix, table, operation, host=None):
     """Create a topic name.
 
     The topic name needs to be synced between the agent and the
@@ -46,6 +47,9 @@ def get_topic_name(prefix, table, operation):
     :param table: The table in question (NETWORK, SUBNET, PORT).
     :param operation: The operation that invokes notification (CREATE,
                       DELETE, UPDATE)
+    :param host: Add host to the topic
     :returns: The topic name.
     """
+    if host:
+        return '%s-%s-%s.%s' % (prefix, table, operation, host)
     return '%s-%s-%s' % (prefix, table, operation)
index 79c0f92f527b64651e8cca1c86f1bdd185d152d6..f045f757dd859e80e67bb636b74e7a30087c81ab 100755 (executable)
@@ -601,6 +601,11 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
                                               details['physical_network'],
                                               segmentation_id,
                                               details['port_id'])
+
+                    # update plugin about port status
+                    self.plugin_rpc.update_device_up(self.context,
+                                                     device,
+                                                     self.agent_id)
                 else:
                     self.remove_port_binding(details['network_id'],
                                              details['port_id'])
diff --git a/neutron/plugins/ml2/drivers/l2pop/__init__.py b/neutron/plugins/ml2/drivers/l2pop/__init__.py
new file mode 100644 (file)
index 0000000..b9b2306
--- /dev/null
@@ -0,0 +1,18 @@
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
diff --git a/neutron/plugins/ml2/drivers/l2pop/config.py b/neutron/plugins/ml2/drivers/l2pop/config.py
new file mode 100644 (file)
index 0000000..1e0701e
--- /dev/null
@@ -0,0 +1,29 @@
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from oslo.config import cfg
+
+
+l2_population_options = [
+    cfg.IntOpt('agent_boot_time', default=180,
+               help=_('Delay within which agent is expected to update '
+                      'existing ports whent it restarts')),
+]
+
+cfg.CONF.register_opts(l2_population_options, "l2pop")
diff --git a/neutron/plugins/ml2/drivers/l2pop/constants.py b/neutron/plugins/ml2/drivers/l2pop/constants.py
new file mode 100644 (file)
index 0000000..74ca3a1
--- /dev/null
@@ -0,0 +1,20 @@
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+SUPPORTED_AGENT_TYPES = []
diff --git a/neutron/plugins/ml2/drivers/l2pop/db.py b/neutron/plugins/ml2/drivers/l2pop/db.py
new file mode 100644 (file)
index 0000000..b176a39
--- /dev/null
@@ -0,0 +1,76 @@
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from neutron.db import agents_db
+from neutron.db import db_base_plugin_v2 as base_db
+from neutron.db import models_v2
+from neutron.openstack.common import jsonutils
+from neutron.openstack.common import timeutils
+from neutron.plugins.ml2.drivers.l2pop import constants as l2_const
+from neutron.plugins.ml2 import models as ml2_models
+
+
+class L2populationDbMixin(base_db.CommonDbMixin):
+
+    def get_agent_ip_by_host(self, session, agent_host):
+        agent = self.get_agent_by_host(session, agent_host)
+        if agent:
+            return self.get_agent_ip(agent)
+
+    def get_agent_ip(self, agent):
+        configuration = jsonutils.loads(agent.configurations)
+        return configuration.get('tunneling_ip')
+
+    def get_agent_uptime(self, agent):
+        return timeutils.delta_seconds(agent.started_at,
+                                       agent.heartbeat_timestamp)
+
+    def get_agent_tunnel_types(self, agent):
+        configuration = jsonutils.loads(agent.configurations)
+        return configuration.get('tunnel_types')
+
+    def get_agent_by_host(self, session, agent_host):
+        with session.begin(subtransactions=True):
+            query = session.query(agents_db.Agent)
+            query = query.filter(agents_db.Agent.host == agent_host,
+                                 agents_db.Agent.agent_type.in_(
+                                     l2_const.SUPPORTED_AGENT_TYPES))
+            return query.first()
+
+    def get_network_ports(self, session, network_id):
+        with session.begin(subtransactions=True):
+            query = session.query(ml2_models.PortBinding,
+                                  agents_db.Agent)
+            query = query.join(agents_db.Agent,
+                               agents_db.Agent.host ==
+                               ml2_models.PortBinding.host)
+            query = query.filter(models_v2.Port.network_id == network_id,
+                                 models_v2.Port.admin_state_up == True,
+                                 agents_db.Agent.agent_type.in_(
+                                     l2_const.SUPPORTED_AGENT_TYPES))
+            return query
+
+    def get_agent_network_port_count(self, session, agent_host, network_id):
+        with session.begin(subtransactions=True):
+            query = session.query(models_v2.Port)
+
+            query = query.join(ml2_models.PortBinding)
+            query = query.filter(models_v2.Port.network_id == network_id,
+                                 ml2_models.PortBinding.host == agent_host)
+            return query.count()
diff --git a/neutron/plugins/ml2/drivers/l2pop/mech_driver.py b/neutron/plugins/ml2/drivers/l2pop/mech_driver.py
new file mode 100644 (file)
index 0000000..8ab92da
--- /dev/null
@@ -0,0 +1,198 @@
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from oslo.config import cfg
+
+from neutron.common import constants as const
+from neutron import context as n_context
+from neutron.db import api as db_api
+from neutron.openstack.common import log as logging
+from neutron.plugins.ml2 import driver_api as api
+from neutron.plugins.ml2.drivers.l2pop import config  # noqa
+from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
+from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
+
+LOG = logging.getLogger(__name__)
+
+
+class L2populationMechanismDriver(api.MechanismDriver,
+                                  l2pop_db.L2populationDbMixin):
+
+    def initialize(self):
+        LOG.debug(_("Experimental L2 population driver"))
+
+    def _get_port_fdb_entries(self, port):
+        return [[port['mac_address'],
+                 ip['ip_address']] for ip in port['fixed_ips']]
+
+    def delete_port_precommit(self, context):
+        self.remove_fdb_entries = self._update_port_down(context)
+
+    def delete_port_postcommit(self, context):
+        self._notify_remove_fdb_entries(context,
+                                        self.remove_fdb_entries)
+
+    def _notify_remove_fdb_entries(self, context, fdb_entries):
+        rpc_ctx = n_context.get_admin_context_without_session()
+        l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
+            rpc_ctx, fdb_entries)
+
+    def update_port_postcommit(self, context):
+        port = context.current
+        orig = context.original
+
+        if port['status'] == orig['status']:
+            return
+
+        if port['status'] == const.PORT_STATUS_ACTIVE:
+            self._update_port_up(context)
+        elif port['status'] == const.PORT_STATUS_DOWN:
+            fdb_entries = self._update_port_down(context)
+            self._notify_remove_fdb_entries(context, fdb_entries)
+
+    def _update_port_up(self, context):
+        port_context = context.current
+        network_id = port_context['network_id']
+        agent_host = port_context['binding:host_id']
+        if not agent_host:
+            return
+
+        session = db_api.get_session()
+        agent = self.get_agent_by_host(session, agent_host)
+        if not agent:
+            return
+
+        agent_ip = self.get_agent_ip(agent)
+        if not agent_ip:
+            LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"),
+                        agent_host)
+            return
+
+        segment = context.bound_segment
+        if not segment:
+            LOG.warning(_("Port %(port)s updated by agent %(agent)s "
+                          "isn't bound to any segment"),
+                        {'port': port_context['id'], 'agent': agent.host})
+            return
+
+        tunnel_types = self.get_agent_tunnel_types(agent)
+        if segment['network_type'] not in tunnel_types:
+            return
+
+        agent_ports = self.get_agent_network_port_count(session, agent_host,
+                                                        network_id)
+
+        rpc_ctx = n_context.get_admin_context_without_session()
+
+        other_fdb_entries = {network_id:
+                             {'segment_id': segment['segmentation_id'],
+                              'network_type': segment['network_type'],
+                              'ports': {agent_ip: []}}}
+
+        if agent_ports == 1 or (
+                self.get_agent_uptime(agent) < cfg.CONF.l2pop.agent_boot_time):
+            # First port plugged on current agent in this network,
+            # we have to provide it with the whole list of fdb entries
+            agent_fdb_entries = {network_id:
+                                 {'segment_id': segment['segmentation_id'],
+                                  'network_type': segment['network_type'],
+                                  'ports': {}}}
+            ports = agent_fdb_entries[network_id]['ports']
+
+            network_ports = self.get_network_ports(session, network_id)
+            for network_port in network_ports:
+                binding, agent = network_port
+                if agent.host == agent_host:
+                    continue
+
+                ip = self.get_agent_ip(agent)
+                if not ip:
+                    LOG.debug(_("Unable to retrieve the agent ip, check "
+                                "the agent %(agent_host)s configuration."),
+                              {'agent_host': agent.host})
+                    continue
+
+                agent_ports = ports.get(ip, [const.FLOODING_ENTRY])
+                agent_ports += self._get_port_fdb_entries(binding.port)
+                ports[ip] = agent_ports
+
+            # And notify other agents to add flooding entry
+            other_fdb_entries[network_id]['ports'][agent_ip].append(
+                const.FLOODING_ENTRY)
+
+            if ports.keys():
+                l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
+                    rpc_ctx, agent_fdb_entries, agent_host)
+
+        # Notify other agents to add fdb rule for current port
+        fdb_entries = self._get_port_fdb_entries(port_context)
+        other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
+
+        l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx,
+                                                          other_fdb_entries)
+
+    def _update_port_down(self, context):
+        port_context = context.current
+        network_id = port_context['network_id']
+
+        agent_host = port_context['binding:host_id']
+        if not agent_host:
+            return
+
+        session = db_api.get_session()
+        agent = self.get_agent_by_host(session, agent_host)
+        if not agent:
+            return
+
+        agent_ip = self.get_agent_ip(agent)
+        if not agent_ip:
+            LOG.warning(_("Unable to retrieve the agent ip, check the agent "
+                          "configuration."))
+            return
+
+        segment = context.bound_segment
+        if not segment:
+            LOG.warning(_("Port %(port)s updated by agent %(agent)s "
+                          "isn't bound to any segment"),
+                        {'port': port_context['id'], 'agent': agent})
+            return
+
+        tunnel_types = self.get_agent_tunnel_types(agent)
+        if segment['network_type'] not in tunnel_types:
+            return
+
+        agent_ports = self.get_agent_network_port_count(session, agent_host,
+                                                        network_id)
+
+        other_fdb_entries = {network_id:
+                             {'segment_id': segment['segmentation_id'],
+                              'network_type': segment['network_type'],
+                              'ports': {agent_ip: []}}}
+
+        if agent_ports == 1:
+            # Agent is removing its last port in this network,
+            # other agents needs to be notified to delete their flooding entry.
+            other_fdb_entries[network_id]['ports'][agent_ip].append(
+                const.FLOODING_ENTRY)
+
+        # Notify other agents to remove fdb rule for current port
+        fdb_entries = self._get_port_fdb_entries(port_context)
+        other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
+
+        return other_fdb_entries
diff --git a/neutron/plugins/ml2/drivers/l2pop/rpc.py b/neutron/plugins/ml2/drivers/l2pop/rpc.py
new file mode 100644 (file)
index 0000000..176f118
--- /dev/null
@@ -0,0 +1,79 @@
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from neutron.common import topics
+from neutron.openstack.common import log as logging
+from neutron.openstack.common.rpc import proxy
+
+
+LOG = logging.getLogger(__name__)
+
+
+class L2populationAgentNotifyAPI(proxy.RpcProxy):
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic=topics.AGENT):
+        super(L2populationAgentNotifyAPI, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+        self.topic_l2pop_update = topics.get_topic_name(topic,
+                                                        topics.L2POPULATION,
+                                                        topics.UPDATE)
+
+    def _notification_fanout(self, context, method, fdb_entries):
+        LOG.debug(_('Fanout notify l2population agents at %(topic)s '
+                    'the message %(method)s with %(fdb_entries)s'),
+                  {'topic': self.topic,
+                   'method': method,
+                   'fdb_entries': fdb_entries})
+
+        self.fanout_cast(context,
+                         self.make_msg(method, fdb_entries=fdb_entries),
+                         topic=self.topic_l2pop_update)
+
+    def _notification_host(self, context, method, fdb_entries, host):
+        LOG.debug(_('Notify l2population agent %(host)s at %(topic)s the '
+                    'message %(method)s with %(fdb_entries)s'),
+                  {'host': host,
+                   'topic': self.topic,
+                   'method': method,
+                   'fdb_entries': fdb_entries})
+        self.cast(context,
+                  self.make_msg(method, fdb_entries=fdb_entries),
+                  topic='%s.%s' % (self.topic_l2pop_update, host))
+
+    def add_fdb_entries(self, context, fdb_entries, host=None):
+        if fdb_entries:
+            if host:
+                self._notification_host(context, 'add_fdb_entries',
+                                        fdb_entries, host)
+            else:
+                self._notification_fanout(context, 'add_fdb_entries',
+                                          fdb_entries)
+
+    def remove_fdb_entries(self, context, fdb_entries, host=None):
+        if fdb_entries:
+            if host:
+                self._notification_host(context, 'remove_fdb_entries',
+                                        fdb_entries, host)
+            else:
+                self._notification_fanout(context, 'remove_fdb_entries',
+                                          fdb_entries)
+
+L2populationAgentNotify = L2populationAgentNotifyAPI()
index 25de47f33ff098fd302fc752538a6c54c61147fe..1d456f6efcb173002531194311131370d0db4ec6 100644 (file)
@@ -519,8 +519,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             network = self.get_network(context, port['network_id'])
             mech_context = driver_context.PortContext(self, context, port,
                                                       network)
-            self._delete_port_binding(mech_context)
             self.mechanism_manager.delete_port_precommit(mech_context)
+            self._delete_port_binding(mech_context)
             self._delete_port_security_group_bindings(context, id)
             super(Ml2Plugin, self).delete_port(context, id)
 
@@ -532,3 +532,30 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             # fact that an error occurred.
             pass
         self.notify_security_groups_member_updated(context, port)
+
+    def update_port_status(self, context, port_id, status):
+        updated = False
+        session = context.session
+        with session.begin(subtransactions=True):
+            port = db.get_port(session, port_id)
+            if not port:
+                LOG.warning(_("Port %(port)s updated up by agent not found"),
+                            {'port': port_id})
+                return False
+
+            if port.status != status:
+                original_port = self._make_port_dict(port)
+                port.status = status
+                updated_port = self._make_port_dict(port)
+                network = self.get_network(context,
+                                           original_port['network_id'])
+                mech_context = driver_context.PortContext(
+                    self, context, updated_port, network,
+                    original_port=original_port)
+                self.mechanism_manager.update_port_precommit(mech_context)
+                updated = True
+
+        if updated:
+            self.mechanism_manager.update_port_postcommit(mech_context)
+
+        return True
index 6ad453a6dd6b8347896d4803709daf712a06682d..4ead0e339422a7e9d9f5565765288b17a2441362 100644 (file)
@@ -21,6 +21,7 @@ from neutron.db import agents_db
 from neutron.db import api as db_api
 from neutron.db import dhcp_rpc_base
 from neutron.db import securitygroups_rpc_base as sg_db_rpc
+from neutron import manager
 from neutron.openstack.common import log
 from neutron.openstack.common.rpc import proxy
 from neutron.plugins.ml2 import db
@@ -128,7 +129,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
                              'vif_type': binding.vif_type})
                 return {'device': device}
 
-            new_status = (q_const.PORT_STATUS_ACTIVE if port.admin_state_up
+            new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
                           else q_const.PORT_STATUS_DOWN)
             if port.status != new_status:
                 port.status = new_status
@@ -157,19 +158,12 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
                   {'device': device, 'agent_id': agent_id})
         port_id = self._device_to_port_id(device)
 
-        session = db_api.get_session()
-        with session.begin(subtransactions=True):
-            port = db.get_port(session, port_id)
-            if not port:
-                LOG.warning(_("Device %(device)s updated down by agent "
-                              "%(agent_id)s not found in database"),
-                            {'device': device, 'agent_id': agent_id})
-                return {'device': device,
-                        'exists': False}
-            if port.status != q_const.PORT_STATUS_DOWN:
-                port.status = q_const.PORT_STATUS_DOWN
-            return {'device': device,
-                    'exists': True}
+        plugin = manager.NeutronManager.get_plugin()
+        port_exists = plugin.update_port_status(rpc_context, port_id,
+                                                q_const.PORT_STATUS_DOWN)
+
+        return {'device': device,
+                'exists': port_exists}
 
     def update_device_up(self, rpc_context, **kwargs):
         """Device is up on agent."""
@@ -179,15 +173,9 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
                   {'device': device, 'agent_id': agent_id})
         port_id = self._device_to_port_id(device)
 
-        session = db_api.get_session()
-        with session.begin(subtransactions=True):
-            port = db.get_port(session, port_id)
-            if not port:
-                LOG.warning(_("Device %(device)s updated up by agent "
-                              "%(agent_id)s not found in database"),
-                            {'device': device, 'agent_id': agent_id})
-            if port.status != q_const.PORT_STATUS_ACTIVE:
-                port.status = q_const.PORT_STATUS_ACTIVE
+        plugin = manager.NeutronManager.get_plugin()
+        plugin.update_port_status(rpc_context, port_id,
+                                  q_const.PORT_STATUS_ACTIVE)
 
 
 class AgentNotifierApi(proxy.RpcProxy,
index 3a2435ee5a7ebc263e22adf34462c3baef05a994..5a5182dc7f70d20ff55c6d6af0a63ea533a29cbe 100644 (file)
@@ -782,6 +782,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                                     details['physical_network'],
                                     details['segmentation_id'],
                                     details['admin_state_up'])
+
+                # update plugin about port status
+                self.plugin_rpc.update_device_up(self.context,
+                                                 device,
+                                                 self.agent_id)
             else:
                 LOG.debug(_("Device %s not defined on plugin"), device)
                 if (port and int(port.ofport) != -1):
@@ -801,6 +806,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                           {'device': device, 'e': e})
                 resync = True
                 continue
+
+            # update plugin about port status
+            self.plugin_rpc.update_device_up(self.context,
+                                             device,
+                                             self.agent_id)
         return resync
 
     def treat_devices_removed(self, devices):
diff --git a/neutron/tests/unit/ml2/drivers/test_l2population.py b/neutron/tests/unit/ml2/drivers/test_l2population.py
new file mode 100644 (file)
index 0000000..718926a
--- /dev/null
@@ -0,0 +1,408 @@
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+import mock
+
+from neutron.common import constants
+from neutron.common import topics
+from neutron import context
+from neutron.db import agents_db
+from neutron.db import api as db_api
+from neutron.extensions import portbindings
+from neutron.extensions import providernet as pnet
+from neutron.openstack.common import timeutils
+from neutron.plugins.ml2 import config as config
+from neutron.plugins.ml2.drivers.l2pop import constants as l2_consts
+from neutron.plugins.ml2 import managers
+from neutron.plugins.ml2 import rpc
+from neutron.tests.unit import test_db_plugin as test_plugin
+
+HOST = 'my_l2_host'
+L2_AGENT = {
+    'binary': 'neutron-openvswitch-agent',
+    'host': HOST,
+    'topic': constants.L2_AGENT_TOPIC,
+    'configurations': {'tunneling_ip': '20.0.0.1',
+                       'tunnel_types': ['vxlan']},
+    'agent_type': constants.AGENT_TYPE_OVS,
+    'tunnel_type': [],
+    'start_flag': True
+}
+
+L2_AGENT_2 = {
+    'binary': 'neutron-openvswitch-agent',
+    'host': HOST + '_2',
+    'topic': constants.L2_AGENT_TOPIC,
+    'configurations': {'tunneling_ip': '20.0.0.2',
+                       'tunnel_types': ['vxlan']},
+    'agent_type': constants.AGENT_TYPE_OVS,
+    'tunnel_type': [],
+    'start_flag': True
+}
+
+L2_AGENT_3 = {
+    'binary': 'neutron-openvswitch-agent',
+    'host': HOST + '_3',
+    'topic': constants.L2_AGENT_TOPIC,
+    'configurations': {'tunneling_ip': '20.0.0.2',
+                       'tunnel_types': []},
+    'agent_type': constants.AGENT_TYPE_OVS,
+    'tunnel_type': [],
+    'start_flag': True
+}
+
+PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
+NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
+
+
+class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
+
+    def setUp(self):
+        # Enable the test mechanism driver to ensure that
+        # we can successfully call through to all mechanism
+        # driver apis.
+        config.cfg.CONF.set_override('mechanism_drivers',
+                                     ['openvswitch', 'linuxbridge',
+                                      'l2population'],
+                                     'ml2')
+        super(TestL2PopulationRpcTestCase, self).setUp(PLUGIN_NAME)
+        self.addCleanup(config.cfg.CONF.reset)
+        self.port_create_status = 'DOWN'
+
+        self.adminContext = context.get_admin_context()
+
+        self.type_manager = managers.TypeManager()
+        self.notifier = rpc.AgentNotifierApi(topics.AGENT)
+        self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
+
+        self.orig_supported_agents = l2_consts.SUPPORTED_AGENT_TYPES
+        l2_consts.SUPPORTED_AGENT_TYPES = [constants.AGENT_TYPE_OVS]
+
+        net_arg = {pnet.NETWORK_TYPE: 'vxlan',
+                   pnet.SEGMENTATION_ID: '1'}
+        self._network = self._make_network(self.fmt, 'net1', True,
+                                           arg_list=(pnet.NETWORK_TYPE,
+                                                     pnet.SEGMENTATION_ID,),
+                                           **net_arg)
+
+        notifier_patch = mock.patch(NOTIFIER)
+        notifier_patch.start()
+
+        self.fanout_topic = topics.get_topic_name(topics.AGENT,
+                                                  topics.L2POPULATION,
+                                                  topics.UPDATE)
+        fanout = ('neutron.openstack.common.rpc.proxy.RpcProxy.fanout_cast')
+        fanout_patch = mock.patch(fanout)
+        self.mock_fanout = fanout_patch.start()
+
+        cast = ('neutron.openstack.common.rpc.proxy.RpcProxy.cast')
+        cast_patch = mock.patch(cast)
+        self.mock_cast = cast_patch.start()
+
+        uptime = ('neutron.plugins.ml2.drivers.l2pop.db.L2populationDbMixin.'
+                  'get_agent_uptime')
+        uptime_patch = mock.patch(uptime, return_value=190)
+        uptime_patch.start()
+
+        self.addCleanup(mock.patch.stopall)
+        self.addCleanup(db_api.clear_db)
+
+    def tearDown(self):
+        l2_consts.SUPPORTED_AGENT_TYPES = self.orig_supported_agents
+        super(TestL2PopulationRpcTestCase, self).tearDown()
+
+    def _register_ml2_agents(self):
+        callback = agents_db.AgentExtRpcCallback()
+        callback.report_state(self.adminContext,
+                              agent_state={'agent_state': L2_AGENT},
+                              time=timeutils.strtime())
+        callback.report_state(self.adminContext,
+                              agent_state={'agent_state': L2_AGENT_2},
+                              time=timeutils.strtime())
+        callback.report_state(self.adminContext,
+                              agent_state={'agent_state': L2_AGENT_3},
+                              time=timeutils.strtime())
+
+    def test_fdb_add_called(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST}
+            with self.port(subnet=subnet,
+                           arg_list=(portbindings.HOST_ID,),
+                           **host_arg) as port1:
+                with self.port(subnet=subnet,
+                               arg_list=(portbindings.HOST_ID,),
+                               **host_arg):
+                    p1 = port1['port']
+
+                    device = 'tap' + p1['id']
+
+                    self.mock_fanout.reset_mock()
+                    self.callbacks.update_device_up(self.adminContext,
+                                                    agent_id=HOST,
+                                                    device=device)
+
+                    p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+                    expected = {'args':
+                                {'fdb_entries':
+                                 {p1['network_id']:
+                                  {'ports':
+                                   {'20.0.0.1': [[p1['mac_address'],
+                                                  p1_ips[0]]]},
+                                   'network_type': 'vxlan',
+                                   'segment_id': 1}}},
+                                'namespace': None,
+                                'method': 'add_fdb_entries'}
+
+                    self.mock_fanout.assert_called_with(
+                        mock.ANY, expected, topic=self.fanout_topic)
+
+    def test_fdb_add_not_called_type_local(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST + '_3'}
+            with self.port(subnet=subnet,
+                           arg_list=(portbindings.HOST_ID,),
+                           **host_arg) as port1:
+                with self.port(subnet=subnet,
+                               arg_list=(portbindings.HOST_ID,),
+                               **host_arg):
+                    p1 = port1['port']
+
+                    device = 'tap' + p1['id']
+
+                    self.mock_fanout.reset_mock()
+                    self.callbacks.update_device_up(self.adminContext,
+                                                    agent_id=HOST,
+                                                    device=device)
+
+                    self.assertFalse(self.mock_fanout.called)
+
+    def test_fdb_add_two_agents(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST,
+                        'admin_state_up': True}
+            with self.port(subnet=subnet,
+                           arg_list=(portbindings.HOST_ID, 'admin_state_up',),
+                           **host_arg) as port1:
+                host_arg = {portbindings.HOST_ID: HOST + '_2',
+                            'admin_state_up': True}
+                with self.port(subnet=subnet,
+                               arg_list=(portbindings.HOST_ID,
+                                         'admin_state_up',),
+                               **host_arg) as port2:
+                    p1 = port1['port']
+                    p2 = port2['port']
+
+                    device = 'tap' + p1['id']
+
+                    self.mock_cast.reset_mock()
+                    self.mock_fanout.reset_mock()
+                    self.callbacks.update_device_up(self.adminContext,
+                                                    agent_id=HOST,
+                                                    device=device)
+
+                    p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+                    p2_ips = [p['ip_address'] for p in p2['fixed_ips']]
+
+                    expected1 = {'args':
+                                 {'fdb_entries':
+                                  {p1['network_id']:
+                                   {'ports':
+                                    {'20.0.0.2': [constants.FLOODING_ENTRY,
+                                                  [p2['mac_address'],
+                                                   p2_ips[0]]]},
+                                    'network_type': 'vxlan',
+                                    'segment_id': 1}}},
+                                 'namespace': None,
+                                 'method': 'add_fdb_entries'}
+
+                    topic = topics.get_topic_name(topics.AGENT,
+                                                  topics.L2POPULATION,
+                                                  topics.UPDATE,
+                                                  HOST)
+
+                    self.mock_cast.assert_called_with(mock.ANY,
+                                                      expected1,
+                                                      topic=topic)
+
+                    expected2 = {'args':
+                                 {'fdb_entries':
+                                  {p1['network_id']:
+                                   {'ports':
+                                    {'20.0.0.1': [constants.FLOODING_ENTRY,
+                                                  [p1['mac_address'],
+                                                   p1_ips[0]]]},
+                                    'network_type': 'vxlan',
+                                    'segment_id': 1}}},
+                                 'namespace': None,
+                                 'method': 'add_fdb_entries'}
+
+                    self.mock_fanout.assert_called_with(
+                        mock.ANY, expected2, topic=self.fanout_topic)
+
+    def test_fdb_add_called_two_networks(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST}
+            with self.port(subnet=subnet,
+                           arg_list=(portbindings.HOST_ID,),
+                           **host_arg) as port1:
+                with self.subnet(cidr='10.1.0.0/24') as subnet2:
+                    host_arg = {portbindings.HOST_ID: HOST + '_2'}
+                    with self.port(subnet=subnet2,
+                                   arg_list=(portbindings.HOST_ID,),
+                                   **host_arg):
+                        p1 = port1['port']
+
+                        device = 'tap' + p1['id']
+
+                        self.mock_fanout.reset_mock()
+                        self.callbacks.update_device_up(self.adminContext,
+                                                        agent_id=HOST,
+                                                        device=device)
+
+                        p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+                        expected = {'args':
+                                    {'fdb_entries':
+                                     {p1['network_id']:
+                                      {'ports':
+                                       {'20.0.0.1': [constants.FLOODING_ENTRY,
+                                                     [p1['mac_address'],
+                                                      p1_ips[0]]]},
+                                       'network_type': 'vxlan',
+                                       'segment_id': 1}}},
+                                    'namespace': None,
+                                    'method': 'add_fdb_entries'}
+
+                        self.mock_fanout.assert_called_with(
+                            mock.ANY, expected, topic=self.fanout_topic)
+
+    def test_fdb_remove_called_from_rpc(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST}
+            with self.port(subnet=subnet,
+                           arg_list=(portbindings.HOST_ID,),
+                           **host_arg):
+                with self.port(subnet=subnet,
+                               arg_list=(portbindings.HOST_ID,),
+                               **host_arg) as port:
+                    p1 = port['port']
+
+                    device = 'tap' + p1['id']
+
+                    self.mock_fanout.reset_mock()
+                    self.callbacks.update_device_up(self.adminContext,
+                                                    agent_id=HOST,
+                                                    device=device)
+
+                    self.callbacks.update_device_down(self.adminContext,
+                                                      agent_id=HOST,
+                                                      device=device)
+
+                    p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+                    expected = {'args':
+                                {'fdb_entries':
+                                 {p1['network_id']:
+                                  {'ports':
+                                   {'20.0.0.1': [[p1['mac_address'],
+                                                  p1_ips[0]]]},
+                                   'network_type': 'vxlan',
+                                   'segment_id': 1}}},
+                                'namespace': None,
+                                'method': 'remove_fdb_entries'}
+
+                    self.mock_fanout.assert_called_with(
+                        mock.ANY, expected, topic=self.fanout_topic)
+
+    def test_fdb_remove_called(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST}
+            with self.port(subnet=subnet,
+                           arg_list=(portbindings.HOST_ID,),
+                           **host_arg):
+
+                with self.port(subnet=subnet,
+                               arg_list=(portbindings.HOST_ID,),
+                               **host_arg) as port:
+                    p1 = port['port']
+
+                    device = 'tap' + p1['id']
+
+                    self.mock_fanout.reset_mock()
+                    self.callbacks.update_device_up(self.adminContext,
+                                                    agent_id=HOST,
+                                                    device=device)
+
+                p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+                expected = {'args':
+                            {'fdb_entries':
+                             {p1['network_id']:
+                              {'ports':
+                               {'20.0.0.1': [[p1['mac_address'],
+                                              p1_ips[0]]]},
+                               'network_type': 'vxlan',
+                               'segment_id': 1}}},
+                            'namespace': None,
+                            'method': 'remove_fdb_entries'}
+
+                self.mock_fanout.assert_any_call(
+                    mock.ANY, expected, topic=self.fanout_topic)
+
+    def test_fdb_remove_called_last_port(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST}
+
+            with self.port(subnet=subnet,
+                           arg_list=(portbindings.HOST_ID,),
+                           **host_arg) as port:
+                p1 = port['port']
+
+                device = 'tap' + p1['id']
+
+                self.callbacks.update_device_up(self.adminContext,
+                                                agent_id=HOST,
+                                                device=device)
+
+            p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+            expected = {'args':
+                        {'fdb_entries':
+                         {p1['network_id']:
+                          {'ports':
+                           {'20.0.0.1': [constants.FLOODING_ENTRY,
+                                         [p1['mac_address'],
+                                          p1_ips[0]]]},
+                           'network_type': 'vxlan',
+                           'segment_id': 1}}},
+                        'namespace': None,
+                        'method': 'remove_fdb_entries'}
+
+            self.mock_fanout.assert_any_call(
+                mock.ANY, expected, topic=self.fanout_topic)
index 546c56c4dc84942f9f9802769cbe5f2a24cd0e65..3cdd9d9eefdb0bae748fe05a941b5288c1b53867 100644 (file)
@@ -184,8 +184,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                               return_value=details),
             mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
                               return_value=port),
+            mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
             mock.patch.object(self.agent, func_name)
-        ) as (get_dev_fn, get_vif_func, func):
+        ) as (get_dev_fn, get_vif_func, upd_dev_up, func):
             self.assertFalse(self.agent.treat_devices_added([{}]))
         return func.called
 
index 7327b53da8633ae0af977ee03836d010169198fe..35e9b75fb84d22792d8975832d096af5ef83eea7 100644 (file)
@@ -95,3 +95,19 @@ class AgentRPCMethods(base.BaseTestCase):
         with mock.patch(call_to_patch) as create_connection:
             rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
             create_connection.assert_has_calls(expected)
+
+    def test_create_consumers_with_node_name(self):
+        dispatcher = mock.Mock()
+        expected = [
+            mock.call(new=True),
+            mock.call().create_consumer('foo-topic-op', dispatcher,
+                                        fanout=True),
+            mock.call().create_consumer('foo-topic-op.node1', dispatcher,
+                                        fanout=False),
+            mock.call().consume_in_thread()
+        ]
+
+        call_to_patch = 'neutron.openstack.common.rpc.create_connection'
+        with mock.patch(call_to_patch) as create_connection:
+            rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
+            create_connection.assert_has_calls(expected)
index 2f1ebfc2c6187f741271991d963a3cbeec7dc029..4876309e127cbc40ce84347a576bbb9a616f4cc4 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -51,7 +51,7 @@ data_files =
     etc/neutron/plugins/linuxbridge = etc/neutron/plugins/linuxbridge/linuxbridge_conf.ini
     etc/neutron/plugins/metaplugin = etc/neutron/plugins/metaplugin/metaplugin.ini
     etc/neutron/plugins/midonet = etc/neutron/plugins/midonet/midonet.ini
-    etc/neutron/plugins/ml2 = 
+    etc/neutron/plugins/ml2 =
         etc/neutron/plugins/ml2/ml2_conf.ini
         etc/neutron/plugins/ml2/ml2_conf_arista.ini
         etc/neutron/plugins/ml2/ml2_conf_cisco.ini
@@ -129,6 +129,7 @@ neutron.ml2.mechanism_drivers =
     ncs = neutron.plugins.ml2.drivers.mechanism_ncs:NCSMechanismDriver
     arista = neutron.plugins.ml2.drivers.mech_arista.mechanism_arista:AristaDriver
     cisco_nexus = neutron.plugins.ml2.drivers.cisco.mech_cisco_nexus:CiscoNexusMechanismDriver
+    l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver
 
 [build_sphinx]
 all_files = 1