--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+import abc
+
+from oslo.config import cfg
+
+from neutron.common import log
+
+
+class L2populationRpcCallBackMixin(object):
+ __metaclass__ = abc.ABCMeta
+
+ @log.log
+ def add_fdb_entries(self, context, fdb_entries, host=None):
+ if not host or host == cfg.CONF.host:
+ self.fdb_add(context, fdb_entries)
+
+ @log.log
+ def remove_fdb_entries(self, context, fdb_entries, host=None):
+ if not host or host == cfg.CONF.host:
+ self.fdb_remove(context, fdb_entries)
+
+ @abc.abstractmethod
+ def fdb_add(self, context, fdb_entries):
+ pass
+
+ @abc.abstractmethod
+ def fdb_remove(self, context, fdb_entries):
+ pass
# License for the specific language governing permissions and limitations
# under the License.
+import itertools
+
from neutron.common import topics
from neutron.openstack.common import log as logging
:param dispatcher: The dispatcher to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
- :param topic_details: A list of topics. Each topic has a name and a
- operation.
+ :param topic_details: A list of topics. Each topic has a name, an
+ operation, and an optional host param keying the
+ subscription to topic.host for plugin calls.
:returns: A common Connection.
"""
connection = rpc.create_connection(new=True)
- for topic, operation in topic_details:
+ for details in topic_details:
+ topic, operation, node_name = itertools.islice(
+ itertools.chain(details, [None]), 3)
+
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, dispatcher, fanout=True)
+ if node_name:
+ node_topic_name = '%s.%s' % (topic_name, node_name)
+ connection.create_consumer(node_topic_name,
+ dispatcher,
+ fanout=False)
connection.consume_in_thread()
return connection
MIN_VLAN_TAG = 1
MAX_VLAN_TAG = 4094
+FLOODING_ENTRY = ['00:00:00:00:00:00', '0.0.0.0']
EXT_NS_COMP = '_backward_comp_e_ns'
EXT_NS = '_extension_ns'
SUBNET = 'subnet'
PORT = 'port'
SECURITY_GROUP = 'security_group'
+L2POPULATION = 'l2population'
CREATE = 'create'
DELETE = 'delete'
METERING_AGENT = 'metering_agent'
-def get_topic_name(prefix, table, operation):
+def get_topic_name(prefix, table, operation, host=None):
"""Create a topic name.
The topic name needs to be synced between the agent and the
:param table: The table in question (NETWORK, SUBNET, PORT).
:param operation: The operation that invokes notification (CREATE,
DELETE, UPDATE)
+ :param host: Add host to the topic
:returns: The topic name.
"""
+ if host:
+ return '%s-%s-%s.%s' % (prefix, table, operation, host)
return '%s-%s-%s' % (prefix, table, operation)
details['physical_network'],
segmentation_id,
details['port_id'])
+
+ # update plugin about port status
+ self.plugin_rpc.update_device_up(self.context,
+ device,
+ self.agent_id)
else:
self.remove_port_binding(details['network_id'],
details['port_id'])
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from oslo.config import cfg
+
+
+l2_population_options = [
+ cfg.IntOpt('agent_boot_time', default=180,
+ help=_('Delay within which agent is expected to update '
+ 'existing ports whent it restarts')),
+]
+
+cfg.CONF.register_opts(l2_population_options, "l2pop")
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+SUPPORTED_AGENT_TYPES = []
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from neutron.db import agents_db
+from neutron.db import db_base_plugin_v2 as base_db
+from neutron.db import models_v2
+from neutron.openstack.common import jsonutils
+from neutron.openstack.common import timeutils
+from neutron.plugins.ml2.drivers.l2pop import constants as l2_const
+from neutron.plugins.ml2 import models as ml2_models
+
+
+class L2populationDbMixin(base_db.CommonDbMixin):
+
+ def get_agent_ip_by_host(self, session, agent_host):
+ agent = self.get_agent_by_host(session, agent_host)
+ if agent:
+ return self.get_agent_ip(agent)
+
+ def get_agent_ip(self, agent):
+ configuration = jsonutils.loads(agent.configurations)
+ return configuration.get('tunneling_ip')
+
+ def get_agent_uptime(self, agent):
+ return timeutils.delta_seconds(agent.started_at,
+ agent.heartbeat_timestamp)
+
+ def get_agent_tunnel_types(self, agent):
+ configuration = jsonutils.loads(agent.configurations)
+ return configuration.get('tunnel_types')
+
+ def get_agent_by_host(self, session, agent_host):
+ with session.begin(subtransactions=True):
+ query = session.query(agents_db.Agent)
+ query = query.filter(agents_db.Agent.host == agent_host,
+ agents_db.Agent.agent_type.in_(
+ l2_const.SUPPORTED_AGENT_TYPES))
+ return query.first()
+
+ def get_network_ports(self, session, network_id):
+ with session.begin(subtransactions=True):
+ query = session.query(ml2_models.PortBinding,
+ agents_db.Agent)
+ query = query.join(agents_db.Agent,
+ agents_db.Agent.host ==
+ ml2_models.PortBinding.host)
+ query = query.filter(models_v2.Port.network_id == network_id,
+ models_v2.Port.admin_state_up == True,
+ agents_db.Agent.agent_type.in_(
+ l2_const.SUPPORTED_AGENT_TYPES))
+ return query
+
+ def get_agent_network_port_count(self, session, agent_host, network_id):
+ with session.begin(subtransactions=True):
+ query = session.query(models_v2.Port)
+
+ query = query.join(ml2_models.PortBinding)
+ query = query.filter(models_v2.Port.network_id == network_id,
+ ml2_models.PortBinding.host == agent_host)
+ return query.count()
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from oslo.config import cfg
+
+from neutron.common import constants as const
+from neutron import context as n_context
+from neutron.db import api as db_api
+from neutron.openstack.common import log as logging
+from neutron.plugins.ml2 import driver_api as api
+from neutron.plugins.ml2.drivers.l2pop import config # noqa
+from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
+from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
+
+LOG = logging.getLogger(__name__)
+
+
+class L2populationMechanismDriver(api.MechanismDriver,
+ l2pop_db.L2populationDbMixin):
+
+ def initialize(self):
+ LOG.debug(_("Experimental L2 population driver"))
+
+ def _get_port_fdb_entries(self, port):
+ return [[port['mac_address'],
+ ip['ip_address']] for ip in port['fixed_ips']]
+
+ def delete_port_precommit(self, context):
+ self.remove_fdb_entries = self._update_port_down(context)
+
+ def delete_port_postcommit(self, context):
+ self._notify_remove_fdb_entries(context,
+ self.remove_fdb_entries)
+
+ def _notify_remove_fdb_entries(self, context, fdb_entries):
+ rpc_ctx = n_context.get_admin_context_without_session()
+ l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
+ rpc_ctx, fdb_entries)
+
+ def update_port_postcommit(self, context):
+ port = context.current
+ orig = context.original
+
+ if port['status'] == orig['status']:
+ return
+
+ if port['status'] == const.PORT_STATUS_ACTIVE:
+ self._update_port_up(context)
+ elif port['status'] == const.PORT_STATUS_DOWN:
+ fdb_entries = self._update_port_down(context)
+ self._notify_remove_fdb_entries(context, fdb_entries)
+
+ def _update_port_up(self, context):
+ port_context = context.current
+ network_id = port_context['network_id']
+ agent_host = port_context['binding:host_id']
+ if not agent_host:
+ return
+
+ session = db_api.get_session()
+ agent = self.get_agent_by_host(session, agent_host)
+ if not agent:
+ return
+
+ agent_ip = self.get_agent_ip(agent)
+ if not agent_ip:
+ LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"),
+ agent_host)
+ return
+
+ segment = context.bound_segment
+ if not segment:
+ LOG.warning(_("Port %(port)s updated by agent %(agent)s "
+ "isn't bound to any segment"),
+ {'port': port_context['id'], 'agent': agent.host})
+ return
+
+ tunnel_types = self.get_agent_tunnel_types(agent)
+ if segment['network_type'] not in tunnel_types:
+ return
+
+ agent_ports = self.get_agent_network_port_count(session, agent_host,
+ network_id)
+
+ rpc_ctx = n_context.get_admin_context_without_session()
+
+ other_fdb_entries = {network_id:
+ {'segment_id': segment['segmentation_id'],
+ 'network_type': segment['network_type'],
+ 'ports': {agent_ip: []}}}
+
+ if agent_ports == 1 or (
+ self.get_agent_uptime(agent) < cfg.CONF.l2pop.agent_boot_time):
+ # First port plugged on current agent in this network,
+ # we have to provide it with the whole list of fdb entries
+ agent_fdb_entries = {network_id:
+ {'segment_id': segment['segmentation_id'],
+ 'network_type': segment['network_type'],
+ 'ports': {}}}
+ ports = agent_fdb_entries[network_id]['ports']
+
+ network_ports = self.get_network_ports(session, network_id)
+ for network_port in network_ports:
+ binding, agent = network_port
+ if agent.host == agent_host:
+ continue
+
+ ip = self.get_agent_ip(agent)
+ if not ip:
+ LOG.debug(_("Unable to retrieve the agent ip, check "
+ "the agent %(agent_host)s configuration."),
+ {'agent_host': agent.host})
+ continue
+
+ agent_ports = ports.get(ip, [const.FLOODING_ENTRY])
+ agent_ports += self._get_port_fdb_entries(binding.port)
+ ports[ip] = agent_ports
+
+ # And notify other agents to add flooding entry
+ other_fdb_entries[network_id]['ports'][agent_ip].append(
+ const.FLOODING_ENTRY)
+
+ if ports.keys():
+ l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
+ rpc_ctx, agent_fdb_entries, agent_host)
+
+ # Notify other agents to add fdb rule for current port
+ fdb_entries = self._get_port_fdb_entries(port_context)
+ other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
+
+ l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx,
+ other_fdb_entries)
+
+ def _update_port_down(self, context):
+ port_context = context.current
+ network_id = port_context['network_id']
+
+ agent_host = port_context['binding:host_id']
+ if not agent_host:
+ return
+
+ session = db_api.get_session()
+ agent = self.get_agent_by_host(session, agent_host)
+ if not agent:
+ return
+
+ agent_ip = self.get_agent_ip(agent)
+ if not agent_ip:
+ LOG.warning(_("Unable to retrieve the agent ip, check the agent "
+ "configuration."))
+ return
+
+ segment = context.bound_segment
+ if not segment:
+ LOG.warning(_("Port %(port)s updated by agent %(agent)s "
+ "isn't bound to any segment"),
+ {'port': port_context['id'], 'agent': agent})
+ return
+
+ tunnel_types = self.get_agent_tunnel_types(agent)
+ if segment['network_type'] not in tunnel_types:
+ return
+
+ agent_ports = self.get_agent_network_port_count(session, agent_host,
+ network_id)
+
+ other_fdb_entries = {network_id:
+ {'segment_id': segment['segmentation_id'],
+ 'network_type': segment['network_type'],
+ 'ports': {agent_ip: []}}}
+
+ if agent_ports == 1:
+ # Agent is removing its last port in this network,
+ # other agents needs to be notified to delete their flooding entry.
+ other_fdb_entries[network_id]['ports'][agent_ip].append(
+ const.FLOODING_ENTRY)
+
+ # Notify other agents to remove fdb rule for current port
+ fdb_entries = self._get_port_fdb_entries(port_context)
+ other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
+
+ return other_fdb_entries
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+from neutron.common import topics
+from neutron.openstack.common import log as logging
+from neutron.openstack.common.rpc import proxy
+
+
+LOG = logging.getLogger(__name__)
+
+
+class L2populationAgentNotifyAPI(proxy.RpcProxy):
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic=topics.AGENT):
+ super(L2populationAgentNotifyAPI, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ self.topic_l2pop_update = topics.get_topic_name(topic,
+ topics.L2POPULATION,
+ topics.UPDATE)
+
+ def _notification_fanout(self, context, method, fdb_entries):
+ LOG.debug(_('Fanout notify l2population agents at %(topic)s '
+ 'the message %(method)s with %(fdb_entries)s'),
+ {'topic': self.topic,
+ 'method': method,
+ 'fdb_entries': fdb_entries})
+
+ self.fanout_cast(context,
+ self.make_msg(method, fdb_entries=fdb_entries),
+ topic=self.topic_l2pop_update)
+
+ def _notification_host(self, context, method, fdb_entries, host):
+ LOG.debug(_('Notify l2population agent %(host)s at %(topic)s the '
+ 'message %(method)s with %(fdb_entries)s'),
+ {'host': host,
+ 'topic': self.topic,
+ 'method': method,
+ 'fdb_entries': fdb_entries})
+ self.cast(context,
+ self.make_msg(method, fdb_entries=fdb_entries),
+ topic='%s.%s' % (self.topic_l2pop_update, host))
+
+ def add_fdb_entries(self, context, fdb_entries, host=None):
+ if fdb_entries:
+ if host:
+ self._notification_host(context, 'add_fdb_entries',
+ fdb_entries, host)
+ else:
+ self._notification_fanout(context, 'add_fdb_entries',
+ fdb_entries)
+
+ def remove_fdb_entries(self, context, fdb_entries, host=None):
+ if fdb_entries:
+ if host:
+ self._notification_host(context, 'remove_fdb_entries',
+ fdb_entries, host)
+ else:
+ self._notification_fanout(context, 'remove_fdb_entries',
+ fdb_entries)
+
+L2populationAgentNotify = L2populationAgentNotifyAPI()
network = self.get_network(context, port['network_id'])
mech_context = driver_context.PortContext(self, context, port,
network)
- self._delete_port_binding(mech_context)
self.mechanism_manager.delete_port_precommit(mech_context)
+ self._delete_port_binding(mech_context)
self._delete_port_security_group_bindings(context, id)
super(Ml2Plugin, self).delete_port(context, id)
# fact that an error occurred.
pass
self.notify_security_groups_member_updated(context, port)
+
+ def update_port_status(self, context, port_id, status):
+ updated = False
+ session = context.session
+ with session.begin(subtransactions=True):
+ port = db.get_port(session, port_id)
+ if not port:
+ LOG.warning(_("Port %(port)s updated up by agent not found"),
+ {'port': port_id})
+ return False
+
+ if port.status != status:
+ original_port = self._make_port_dict(port)
+ port.status = status
+ updated_port = self._make_port_dict(port)
+ network = self.get_network(context,
+ original_port['network_id'])
+ mech_context = driver_context.PortContext(
+ self, context, updated_port, network,
+ original_port=original_port)
+ self.mechanism_manager.update_port_precommit(mech_context)
+ updated = True
+
+ if updated:
+ self.mechanism_manager.update_port_postcommit(mech_context)
+
+ return True
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
+from neutron import manager
from neutron.openstack.common import log
from neutron.openstack.common.rpc import proxy
from neutron.plugins.ml2 import db
'vif_type': binding.vif_type})
return {'device': device}
- new_status = (q_const.PORT_STATUS_ACTIVE if port.admin_state_up
+ new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
else q_const.PORT_STATUS_DOWN)
if port.status != new_status:
port.status = new_status
{'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)
- session = db_api.get_session()
- with session.begin(subtransactions=True):
- port = db.get_port(session, port_id)
- if not port:
- LOG.warning(_("Device %(device)s updated down by agent "
- "%(agent_id)s not found in database"),
- {'device': device, 'agent_id': agent_id})
- return {'device': device,
- 'exists': False}
- if port.status != q_const.PORT_STATUS_DOWN:
- port.status = q_const.PORT_STATUS_DOWN
- return {'device': device,
- 'exists': True}
+ plugin = manager.NeutronManager.get_plugin()
+ port_exists = plugin.update_port_status(rpc_context, port_id,
+ q_const.PORT_STATUS_DOWN)
+
+ return {'device': device,
+ 'exists': port_exists}
def update_device_up(self, rpc_context, **kwargs):
"""Device is up on agent."""
{'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)
- session = db_api.get_session()
- with session.begin(subtransactions=True):
- port = db.get_port(session, port_id)
- if not port:
- LOG.warning(_("Device %(device)s updated up by agent "
- "%(agent_id)s not found in database"),
- {'device': device, 'agent_id': agent_id})
- if port.status != q_const.PORT_STATUS_ACTIVE:
- port.status = q_const.PORT_STATUS_ACTIVE
+ plugin = manager.NeutronManager.get_plugin()
+ plugin.update_port_status(rpc_context, port_id,
+ q_const.PORT_STATUS_ACTIVE)
class AgentNotifierApi(proxy.RpcProxy,
details['physical_network'],
details['segmentation_id'],
details['admin_state_up'])
+
+ # update plugin about port status
+ self.plugin_rpc.update_device_up(self.context,
+ device,
+ self.agent_id)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1):
{'device': device, 'e': e})
resync = True
continue
+
+ # update plugin about port status
+ self.plugin_rpc.update_device_up(self.context,
+ device,
+ self.agent_id)
return resync
def treat_devices_removed(self, devices):
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Sylvain Afchain, eNovance SAS
+# @author: Francois Eleouet, Orange
+# @author: Mathieu Rohon, Orange
+
+import mock
+
+from neutron.common import constants
+from neutron.common import topics
+from neutron import context
+from neutron.db import agents_db
+from neutron.db import api as db_api
+from neutron.extensions import portbindings
+from neutron.extensions import providernet as pnet
+from neutron.openstack.common import timeutils
+from neutron.plugins.ml2 import config as config
+from neutron.plugins.ml2.drivers.l2pop import constants as l2_consts
+from neutron.plugins.ml2 import managers
+from neutron.plugins.ml2 import rpc
+from neutron.tests.unit import test_db_plugin as test_plugin
+
+HOST = 'my_l2_host'
+L2_AGENT = {
+ 'binary': 'neutron-openvswitch-agent',
+ 'host': HOST,
+ 'topic': constants.L2_AGENT_TOPIC,
+ 'configurations': {'tunneling_ip': '20.0.0.1',
+ 'tunnel_types': ['vxlan']},
+ 'agent_type': constants.AGENT_TYPE_OVS,
+ 'tunnel_type': [],
+ 'start_flag': True
+}
+
+L2_AGENT_2 = {
+ 'binary': 'neutron-openvswitch-agent',
+ 'host': HOST + '_2',
+ 'topic': constants.L2_AGENT_TOPIC,
+ 'configurations': {'tunneling_ip': '20.0.0.2',
+ 'tunnel_types': ['vxlan']},
+ 'agent_type': constants.AGENT_TYPE_OVS,
+ 'tunnel_type': [],
+ 'start_flag': True
+}
+
+L2_AGENT_3 = {
+ 'binary': 'neutron-openvswitch-agent',
+ 'host': HOST + '_3',
+ 'topic': constants.L2_AGENT_TOPIC,
+ 'configurations': {'tunneling_ip': '20.0.0.2',
+ 'tunnel_types': []},
+ 'agent_type': constants.AGENT_TYPE_OVS,
+ 'tunnel_type': [],
+ 'start_flag': True
+}
+
+PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
+NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
+
+
+class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
+
+ def setUp(self):
+ # Enable the test mechanism driver to ensure that
+ # we can successfully call through to all mechanism
+ # driver apis.
+ config.cfg.CONF.set_override('mechanism_drivers',
+ ['openvswitch', 'linuxbridge',
+ 'l2population'],
+ 'ml2')
+ super(TestL2PopulationRpcTestCase, self).setUp(PLUGIN_NAME)
+ self.addCleanup(config.cfg.CONF.reset)
+ self.port_create_status = 'DOWN'
+
+ self.adminContext = context.get_admin_context()
+
+ self.type_manager = managers.TypeManager()
+ self.notifier = rpc.AgentNotifierApi(topics.AGENT)
+ self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
+
+ self.orig_supported_agents = l2_consts.SUPPORTED_AGENT_TYPES
+ l2_consts.SUPPORTED_AGENT_TYPES = [constants.AGENT_TYPE_OVS]
+
+ net_arg = {pnet.NETWORK_TYPE: 'vxlan',
+ pnet.SEGMENTATION_ID: '1'}
+ self._network = self._make_network(self.fmt, 'net1', True,
+ arg_list=(pnet.NETWORK_TYPE,
+ pnet.SEGMENTATION_ID,),
+ **net_arg)
+
+ notifier_patch = mock.patch(NOTIFIER)
+ notifier_patch.start()
+
+ self.fanout_topic = topics.get_topic_name(topics.AGENT,
+ topics.L2POPULATION,
+ topics.UPDATE)
+ fanout = ('neutron.openstack.common.rpc.proxy.RpcProxy.fanout_cast')
+ fanout_patch = mock.patch(fanout)
+ self.mock_fanout = fanout_patch.start()
+
+ cast = ('neutron.openstack.common.rpc.proxy.RpcProxy.cast')
+ cast_patch = mock.patch(cast)
+ self.mock_cast = cast_patch.start()
+
+ uptime = ('neutron.plugins.ml2.drivers.l2pop.db.L2populationDbMixin.'
+ 'get_agent_uptime')
+ uptime_patch = mock.patch(uptime, return_value=190)
+ uptime_patch.start()
+
+ self.addCleanup(mock.patch.stopall)
+ self.addCleanup(db_api.clear_db)
+
+ def tearDown(self):
+ l2_consts.SUPPORTED_AGENT_TYPES = self.orig_supported_agents
+ super(TestL2PopulationRpcTestCase, self).tearDown()
+
+ def _register_ml2_agents(self):
+ callback = agents_db.AgentExtRpcCallback()
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': L2_AGENT},
+ time=timeutils.strtime())
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': L2_AGENT_2},
+ time=timeutils.strtime())
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': L2_AGENT_3},
+ time=timeutils.strtime())
+
+ def test_fdb_add_called(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST}
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1:
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg):
+ p1 = port1['port']
+
+ device = 'tap' + p1['id']
+
+ self.mock_fanout.reset_mock()
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ expected = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [[p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'add_fdb_entries'}
+
+ self.mock_fanout.assert_called_with(
+ mock.ANY, expected, topic=self.fanout_topic)
+
+ def test_fdb_add_not_called_type_local(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST + '_3'}
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1:
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg):
+ p1 = port1['port']
+
+ device = 'tap' + p1['id']
+
+ self.mock_fanout.reset_mock()
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ self.assertFalse(self.mock_fanout.called)
+
+ def test_fdb_add_two_agents(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST,
+ 'admin_state_up': True}
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID, 'admin_state_up',),
+ **host_arg) as port1:
+ host_arg = {portbindings.HOST_ID: HOST + '_2',
+ 'admin_state_up': True}
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,
+ 'admin_state_up',),
+ **host_arg) as port2:
+ p1 = port1['port']
+ p2 = port2['port']
+
+ device = 'tap' + p1['id']
+
+ self.mock_cast.reset_mock()
+ self.mock_fanout.reset_mock()
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ p2_ips = [p['ip_address'] for p in p2['fixed_ips']]
+
+ expected1 = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.2': [constants.FLOODING_ENTRY,
+ [p2['mac_address'],
+ p2_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'add_fdb_entries'}
+
+ topic = topics.get_topic_name(topics.AGENT,
+ topics.L2POPULATION,
+ topics.UPDATE,
+ HOST)
+
+ self.mock_cast.assert_called_with(mock.ANY,
+ expected1,
+ topic=topic)
+
+ expected2 = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ [p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'add_fdb_entries'}
+
+ self.mock_fanout.assert_called_with(
+ mock.ANY, expected2, topic=self.fanout_topic)
+
+ def test_fdb_add_called_two_networks(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST}
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1:
+ with self.subnet(cidr='10.1.0.0/24') as subnet2:
+ host_arg = {portbindings.HOST_ID: HOST + '_2'}
+ with self.port(subnet=subnet2,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg):
+ p1 = port1['port']
+
+ device = 'tap' + p1['id']
+
+ self.mock_fanout.reset_mock()
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ expected = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ [p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'add_fdb_entries'}
+
+ self.mock_fanout.assert_called_with(
+ mock.ANY, expected, topic=self.fanout_topic)
+
+ def test_fdb_remove_called_from_rpc(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST}
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg):
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port:
+ p1 = port['port']
+
+ device = 'tap' + p1['id']
+
+ self.mock_fanout.reset_mock()
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ self.callbacks.update_device_down(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ expected = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [[p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'remove_fdb_entries'}
+
+ self.mock_fanout.assert_called_with(
+ mock.ANY, expected, topic=self.fanout_topic)
+
+ def test_fdb_remove_called(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST}
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg):
+
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port:
+ p1 = port['port']
+
+ device = 'tap' + p1['id']
+
+ self.mock_fanout.reset_mock()
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ expected = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [[p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'remove_fdb_entries'}
+
+ self.mock_fanout.assert_any_call(
+ mock.ANY, expected, topic=self.fanout_topic)
+
+ def test_fdb_remove_called_last_port(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST}
+
+ with self.port(subnet=subnet,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port:
+ p1 = port['port']
+
+ device = 'tap' + p1['id']
+
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ expected = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ [p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'remove_fdb_entries'}
+
+ self.mock_fanout.assert_any_call(
+ mock.ANY, expected, topic=self.fanout_topic)
return_value=details),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=port),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent, func_name)
- ) as (get_dev_fn, get_vif_func, func):
+ ) as (get_dev_fn, get_vif_func, upd_dev_up, func):
self.assertFalse(self.agent.treat_devices_added([{}]))
return func.called
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
+
+ def test_create_consumers_with_node_name(self):
+ dispatcher = mock.Mock()
+ expected = [
+ mock.call(new=True),
+ mock.call().create_consumer('foo-topic-op', dispatcher,
+ fanout=True),
+ mock.call().create_consumer('foo-topic-op.node1', dispatcher,
+ fanout=False),
+ mock.call().consume_in_thread()
+ ]
+
+ call_to_patch = 'neutron.openstack.common.rpc.create_connection'
+ with mock.patch(call_to_patch) as create_connection:
+ rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
+ create_connection.assert_has_calls(expected)
etc/neutron/plugins/linuxbridge = etc/neutron/plugins/linuxbridge/linuxbridge_conf.ini
etc/neutron/plugins/metaplugin = etc/neutron/plugins/metaplugin/metaplugin.ini
etc/neutron/plugins/midonet = etc/neutron/plugins/midonet/midonet.ini
- etc/neutron/plugins/ml2 =
+ etc/neutron/plugins/ml2 =
etc/neutron/plugins/ml2/ml2_conf.ini
etc/neutron/plugins/ml2/ml2_conf_arista.ini
etc/neutron/plugins/ml2/ml2_conf_cisco.ini
ncs = neutron.plugins.ml2.drivers.mechanism_ncs:NCSMechanismDriver
arista = neutron.plugins.ml2.drivers.mech_arista.mechanism_arista:AristaDriver
cisco_nexus = neutron.plugins.ml2.drivers.cisco.mech_cisco_nexus:CiscoNexusMechanismDriver
+ l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver
[build_sphinx]
all_files = 1