]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Implement OpenFlow Agent mechanism driver
authorfumihiko kakuma <kakuma@valinux.co.jp>
Wed, 29 Jan 2014 01:54:12 +0000 (10:54 +0900)
committerfumihiko kakuma <kakuma@valinux.co.jp>
Sun, 2 Mar 2014 04:40:10 +0000 (13:40 +0900)
This adds ML2 mechanism driver controlling OpenFlow switches
and an agent using Ryu as OpenFlow Python library.
- An agent acts as an OpenFlow controller on each compute nodes.
- OpenFlow 1.3 (vendor agnostic unlike OVS extensions).

Implements: blueprint ryu-ml2-driver
Change-Id: I6a8168d24f911996639179d91c4da49151751057

19 files changed:
etc/neutron/plugins/ml2/ml2_conf_ofa.ini [new file with mode: 0644]
neutron/agent/linux/ovs_lib.py
neutron/common/constants.py
neutron/plugins/ml2/drivers/mech_ofagent.py [new file with mode: 0644]
neutron/plugins/ofagent/README [new file with mode: 0644]
neutron/plugins/ofagent/__init__.py [new file with mode: 0644]
neutron/plugins/ofagent/agent/__init__.py [new file with mode: 0644]
neutron/plugins/ofagent/agent/ofa_neutron_agent.py [new file with mode: 0644]
neutron/plugins/ofagent/common/__init__.py [new file with mode: 0644]
neutron/plugins/ofagent/common/config.py [new file with mode: 0644]
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/ml2/drivers/test_ofagent_mech.py [new file with mode: 0644]
neutron/tests/unit/ofagent/__init__.py [new file with mode: 0644]
neutron/tests/unit/ofagent/fake_oflib.py [new file with mode: 0644]
neutron/tests/unit/ofagent/test_ofa_defaults.py [new file with mode: 0644]
neutron/tests/unit/ofagent/test_ofa_neutron_agent.py [new file with mode: 0644]
neutron/tests/unit/openvswitch/test_ovs_lib.py
neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
setup.cfg

diff --git a/etc/neutron/plugins/ml2/ml2_conf_ofa.ini b/etc/neutron/plugins/ml2/ml2_conf_ofa.ini
new file mode 100644 (file)
index 0000000..4a94b98
--- /dev/null
@@ -0,0 +1,13 @@
+# Defines configuration options specific to the OpenFlow Agent Mechanism Driver
+
+[ovs]
+# Please refer to configuration options to the OpenvSwitch
+
+[agent]
+# (IntOpt) Number of seconds to retry acquiring an Open vSwitch datapath.
+# This is an optional parameter, default value is 60 seconds.
+#
+# get_datapath_retry_times =
+# Example: get_datapath_retry_times = 30
+
+# Please refer to configuration options to the OpenvSwitch else the above.
index c040d6ecc30c5ae6a4e6495fc6e97b6c9f6915e7..a6bddec3c04269e08d8d5e988e1060e09cdde766 100644 (file)
@@ -13,6 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import distutils.version as dist_version
 import re
 
 from oslo.config import cfg
@@ -106,6 +107,27 @@ class OVSBridge(BaseOVS):
         self.defer_apply_flows = False
         self.deferred_flows = {'add': '', 'mod': '', 'del': ''}
 
+    def set_controller(self, controller_names):
+        vsctl_command = ['--', 'set-controller', self.br_name]
+        vsctl_command.extend(controller_names)
+        self.run_vsctl(vsctl_command, check_error=True)
+
+    def del_controller(self):
+        self.run_vsctl(['--', 'del-controller', self.br_name],
+                       check_error=True)
+
+    def get_controller(self):
+        res = self.run_vsctl(['--', 'get-controller', self.br_name],
+                             check_error=True)
+        if res:
+            return res.strip().split('\n')
+        return res
+
+    def set_protocols(self, protocols):
+        self.run_vsctl(['--', 'set', 'bridge', self.br_name,
+                        "protocols=%s" % protocols],
+                       check_error=True)
+
     def create(self):
         self.add_bridge(self.br_name)
 
@@ -401,7 +423,7 @@ class OVSBridge(BaseOVS):
             ofport = data[ofport_idx]
             # ofport must be integer otherwise return None
             if not isinstance(ofport, int) or ofport == -1:
-                LOG.warn(_("ofport: %(ofport)s for VIF: %(vif)s is not a"
+                LOG.warn(_("ofport: %(ofport)s for VIF: %(vif)s is not a "
                            "positive integer"), {'ofport': ofport,
                                                  'vif': port_id})
                 return
@@ -483,3 +505,44 @@ def get_bridge_external_bridge_id(root_helper, bridge):
     except Exception:
         LOG.exception(_("Bridge %s not found."), bridge)
         return None
+
+
+def _compare_installed_and_required_version(
+        installed_version, required_version, check_type, version_type):
+    if installed_version:
+        if dist_version.StrictVersion(
+                installed_version) < dist_version.StrictVersion(
+                required_version):
+            msg = (_('Failed %(ctype)s version check for Open '
+                     'vSwitch with %(vtype)s support. To use '
+                     '%(vtype)s tunnels with OVS, please ensure '
+                     'the OVS version is %(required)s or newer!') %
+                   {'ctype': check_type, 'vtype': version_type,
+                    'required': required_version})
+            raise SystemError(msg)
+    else:
+        msg = (_('Unable to determine %(ctype)s version for Open '
+                 'vSwitch with %(vtype)s support. To use '
+                 '%(vtype)s tunnels with OVS, please ensure '
+                 'that the version is %(required)s or newer!') %
+               {'ctype': check_type, 'vtype': version_type,
+                'required': required_version})
+        raise SystemError(msg)
+
+
+def check_ovs_vxlan_version(root_helper):
+    min_required_version = constants.MINIMUM_OVS_VXLAN_VERSION
+    installed_klm_version = get_installed_ovs_klm_version()
+    installed_usr_version = get_installed_ovs_usr_version(root_helper)
+    LOG.debug(_("Checking OVS version for VXLAN support "
+                "installed klm version is %s "), installed_klm_version)
+    LOG.debug(_("Checking OVS version for VXLAN support "
+                "installed usr version is %s"), installed_usr_version)
+    # First check the userspace version
+    _compare_installed_and_required_version(installed_usr_version,
+                                            min_required_version,
+                                            'userspace', 'VXLAN')
+    # Now check the kernel version
+    _compare_installed_and_required_version(installed_klm_version,
+                                            min_required_version,
+                                            'kernel', 'VXLAN')
index 754d252206cbe9c1daa4dd327bd36a79c3809aa4..fbc10e116f04e2443cfc4a27424f23015b6c23a6 100644 (file)
@@ -67,6 +67,7 @@ AGENT_TYPE_OVS = 'Open vSwitch agent'
 AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
 AGENT_TYPE_HYPERV = 'HyperV agent'
 AGENT_TYPE_NEC = 'NEC plugin agent'
+AGENT_TYPE_OFA = 'OFA driver agent'
 AGENT_TYPE_L3 = 'L3 agent'
 AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
 AGENT_TYPE_MLNX = 'Mellanox plugin agent'
diff --git a/neutron/plugins/ml2/drivers/mech_ofagent.py b/neutron/plugins/ml2/drivers/mech_ofagent.py
new file mode 100644 (file)
index 0000000..3d3909b
--- /dev/null
@@ -0,0 +1,60 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Based on openvswitch mechanism driver.
+#
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+from neutron.common import constants
+from neutron.extensions import portbindings
+from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
+from neutron.plugins.ml2 import driver_api as api
+from neutron.plugins.ml2.drivers import mech_agent
+
+LOG = log.getLogger(__name__)
+
+
+class OfagentMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
+    """Attach to networks using ofagent L2 agent.
+
+    The OfagentMechanismDriver integrates the ml2 plugin with the
+    ofagent L2 agent. Port binding with this driver requires the
+    ofagent agent to be running on the port's host, and that agent
+    to have connectivity to at least one segment of the port's
+    network.
+    """
+
+    def __init__(self):
+        super(OfagentMechanismDriver, self).__init__(
+            constants.AGENT_TYPE_OFA,
+            portbindings.VIF_TYPE_OVS,
+            {portbindings.CAP_PORT_FILTER: True})
+
+    def check_segment_for_agent(self, segment, agent):
+        mappings = agent['configurations'].get('bridge_mappings', {})
+        tunnel_types = agent['configurations'].get('tunnel_types', [])
+        LOG.debug(_("Checking segment: %(segment)s "
+                    "for mappings: %(mappings)s "
+                    "with tunnel_types: %(tunnel_types)s"),
+                  {'segment': segment, 'mappings': mappings,
+                   'tunnel_types': tunnel_types})
+        network_type = segment[api.NETWORK_TYPE]
+        return (
+            network_type == p_const.TYPE_LOCAL or
+            network_type in tunnel_types or
+            (network_type in [p_const.TYPE_FLAT, p_const.TYPE_VLAN] and
+                segment[api.PHYSICAL_NETWORK] in mappings)
+        )
diff --git a/neutron/plugins/ofagent/README b/neutron/plugins/ofagent/README
new file mode 100644 (file)
index 0000000..a43b0dd
--- /dev/null
@@ -0,0 +1,21 @@
+This directory includes agent for OpenFlow Agent mechanism driver.
+
+# -- Installation
+
+For how to install/set up ML2 mechanism driver for OpenFlow Agent, please refer to
+https://github.com/osrg/ryu/wiki/OpenStack
+
+# -- Ryu General
+
+For general Ryu stuff, please refer to
+http://www.osrg.net/ryu/
+
+Ryu is available at github
+git://github.com/osrg/ryu.git
+https://github.com/osrg/ryu
+
+The mailing is at
+ryu-devel@lists.sourceforge.net
+https://lists.sourceforge.net/lists/listinfo/ryu-devel
+
+Enjoy!
diff --git a/neutron/plugins/ofagent/__init__.py b/neutron/plugins/ofagent/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/plugins/ofagent/agent/__init__.py b/neutron/plugins/ofagent/agent/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py
new file mode 100644 (file)
index 0000000..d428be2
--- /dev/null
@@ -0,0 +1,1381 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Based on openvswitch agent.
+#
+# Copyright 2011 Nicira Networks, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+import time
+
+from oslo.config import cfg
+from ryu.app.ofctl import api as ryu_api
+from ryu.base import app_manager
+from ryu.lib import hub
+from ryu.ofproto import ofproto_v1_3 as ryu_ofp13
+
+from neutron.agent.linux import ip_lib
+from neutron.agent.linux import ovs_lib
+from neutron.agent.linux import polling
+from neutron.agent.linux import utils
+from neutron.agent import rpc as agent_rpc
+from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.common import constants as n_const
+from neutron.common import topics
+from neutron.common import utils as n_utils
+from neutron import context
+from neutron.extensions import securitygroup as ext_sg
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
+from neutron.openstack.common.rpc import common as rpc_common
+from neutron.openstack.common.rpc import dispatcher
+from neutron.plugins.common import constants as p_const
+from neutron.plugins.ofagent.common import config  # noqa
+from neutron.plugins.openvswitch.common import constants
+
+
+LOG = logging.getLogger(__name__)
+
+# A placeholder for dead vlans.
+DEAD_VLAN_TAG = str(n_const.MAX_VLAN_TAG + 1)
+
+
+# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
+# attributes set).
+class LocalVLANMapping:
+    def __init__(self, vlan, network_type, physical_network, segmentation_id,
+                 vif_ports=None):
+        if vif_ports is None:
+            vif_ports = {}
+        self.vlan = vlan
+        self.network_type = network_type
+        self.physical_network = physical_network
+        self.segmentation_id = segmentation_id
+        self.vif_ports = vif_ports
+        # set of tunnel ports on which packets should be flooded
+        self.tun_ofports = set()
+
+    def __str__(self):
+        return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
+                (self.vlan, self.network_type, self.physical_network,
+                 self.segmentation_id))
+
+
+class Port(object):
+    """Represents a neutron port.
+
+    Class stores port data in a ORM-free way, so attributres are
+    still available even if a row has been deleted.
+    """
+
+    def __init__(self, p):
+        self.id = p.id
+        self.network_id = p.network_id
+        self.device_id = p.device_id
+        self.admin_state_up = p.admin_state_up
+        self.status = p.status
+
+    def __eq__(self, other):
+        """Compare only fields that will cause us to re-wire."""
+        try:
+            return (other and self.id == other.id
+                    and self.admin_state_up == other.admin_state_up)
+        except Exception:
+            return False
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash(self.id)
+
+
+class OVSBridge(ovs_lib.OVSBridge):
+    def __init__(self, br_name, root_helper, ryuapp):
+        super(OVSBridge, self).__init__(br_name, root_helper)
+        self.datapath_id = None
+        self.datapath = None
+        self.ofparser = None
+        self.ryuapp = ryuapp
+
+    def find_datapath_id(self):
+        self.datapath_id = self.get_datapath_id()
+
+    def get_datapath(self, retry_max=cfg.CONF.AGENT.get_datapath_retry_times):
+        retry = 0
+        while self.datapath is None:
+            self.datapath = ryu_api.get_datapath(self.ryuapp,
+                                                 int(self.datapath_id, 16))
+            retry += 1
+            if retry >= retry_max:
+                LOG.error(_('Agent terminated!: Failed to get a datapath.'))
+                raise SystemExit(1)
+            time.sleep(1)
+        self.ofparser = self.datapath.ofproto_parser
+
+    def setup_ofp(self, controller_names=None,
+                  protocols='OpenFlow13',
+                  retry_max=cfg.CONF.AGENT.get_datapath_retry_times):
+        if not controller_names:
+            host = cfg.CONF.ofp_listen_host
+            if not host:
+                # 127.0.0.1 is a default for agent style of controller
+                host = '127.0.0.1'
+            controller_names = ["tcp:%s:%d" % (host,
+                                               cfg.CONF.ofp_tcp_listen_port)]
+        try:
+            self.set_protocols(protocols)
+            self.set_controller(controller_names)
+        except RuntimeError:
+            LOG.exception(_("Agent terminated"))
+            raise SystemExit(1)
+        self.find_datapath_id()
+        self.get_datapath(retry_max)
+
+
+class OFAPluginApi(agent_rpc.PluginApi,
+                   sg_rpc.SecurityGroupServerRpcApiMixin):
+    pass
+
+
+class OFASecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
+    def __init__(self, context, plugin_rpc, root_helper):
+        self.context = context
+        self.plugin_rpc = plugin_rpc
+        self.root_helper = root_helper
+        self.init_firewall()
+
+
+class OFANeutronAgentRyuApp(app_manager.RyuApp):
+    OFP_VERSIONS = [ryu_ofp13.OFP_VERSION]
+
+    def start(self):
+
+        super(OFANeutronAgentRyuApp, self).start()
+        return hub.spawn(self._agent_main, self)
+
+    def _agent_main(self, ryuapp):
+        cfg.CONF.register_opts(ip_lib.OPTS)
+
+        try:
+            agent_config = create_agent_config_map(cfg.CONF)
+        except ValueError:
+            LOG.exception(_("Agent failed to create agent config map"))
+            raise SystemExit(1)
+
+        is_xen_compute_host = ('rootwrap-xen-dom0' in
+                               agent_config['root_helper'])
+        if is_xen_compute_host:
+            # Force ip_lib to always use the root helper to ensure that ip
+            # commands target xen dom0 rather than domU.
+            cfg.CONF.set_default('ip_lib_force_root', True)
+
+        agent = OFANeutronAgent(ryuapp, **agent_config)
+
+        # Start everything.
+        LOG.info(_("Agent initialized successfully, now running... "))
+        agent.daemon_loop()
+
+
+class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
+    """A agent for OpenFlow Agent ML2 mechanism driver.
+
+    OFANeutronAgent is a OpenFlow Agent agent for a ML2 plugin.
+    This is as a ryu application thread.
+    - An agent acts as an OpenFlow controller on each compute nodes.
+    - OpenFlow 1.3 (vendor agnostic unlike OVS extensions).
+    """
+
+    # history
+    #   1.0 Initial version
+    #   1.1 Support Security Group RPC
+    RPC_API_VERSION = '1.1'
+
+    def __init__(self, ryuapp, integ_br, tun_br, local_ip,
+                 bridge_mappings, root_helper,
+                 polling_interval, tunnel_types=None,
+                 veth_mtu=None, l2_population=False,
+                 minimize_polling=False,
+                 ovsdb_monitor_respawn_interval=(
+                     constants.DEFAULT_OVSDBMON_RESPAWN)):
+        """Constructor.
+
+        :param ryuapp: object of the ryu app.
+        :param integ_br: name of the integration bridge.
+        :param tun_br: name of the tunnel bridge.
+        :param local_ip: local IP address of this hypervisor.
+        :param bridge_mappings: mappings from physical network name to bridge.
+        :param root_helper: utility to use when running shell cmds.
+        :param polling_interval: interval (secs) to poll DB.
+        :param tunnel_types: A list of tunnel types to enable support for in
+               the agent. If set, will automatically set enable_tunneling to
+               True.
+        :param veth_mtu: MTU size for veth interfaces.
+        :param minimize_polling: Optional, whether to minimize polling by
+               monitoring ovsdb for interface changes.
+        :param ovsdb_monitor_respawn_interval: Optional, when using polling
+               minimization, the number of seconds to wait before respawning
+               the ovsdb monitor.
+        """
+        self.ryuapp = ryuapp
+        self.veth_mtu = veth_mtu
+        self.root_helper = root_helper
+        self.available_local_vlans = set(xrange(n_const.MIN_VLAN_TAG,
+                                                n_const.MAX_VLAN_TAG))
+        self.tunnel_types = tunnel_types or []
+        self.l2_pop = l2_population
+        self.agent_state = {
+            'binary': 'neutron-ofa-agent',
+            'host': cfg.CONF.host,
+            'topic': n_const.L2_AGENT_TOPIC,
+            'configurations': {'bridge_mappings': bridge_mappings,
+                               'tunnel_types': self.tunnel_types,
+                               'tunneling_ip': local_ip,
+                               'l2_population': self.l2_pop},
+            'agent_type': n_const.AGENT_TYPE_OFA,
+            'start_flag': True}
+
+        # Keep track of int_br's device count for use by _report_state()
+        self.int_br_device_count = 0
+
+        self.int_br = OVSBridge(integ_br, self.root_helper, self.ryuapp)
+        self.setup_rpc()
+        self.setup_integration_br()
+        self.setup_physical_bridges(bridge_mappings)
+        self.local_vlan_map = {}
+        self.tun_br_ofports = {p_const.TYPE_GRE: {},
+                               p_const.TYPE_VXLAN: {}}
+
+        self.polling_interval = polling_interval
+        self.minimize_polling = minimize_polling
+        self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
+
+        self.enable_tunneling = bool(self.tunnel_types)
+        self.local_ip = local_ip
+        self.tunnel_count = 0
+        self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
+        self._check_ovs_version()
+        if self.enable_tunneling:
+            self.setup_tunnel_br(tun_br)
+        # Collect additional bridges to monitor
+        self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)
+
+        # Security group agent support
+        self.sg_agent = OFASecurityGroupAgent(self.context,
+                                              self.plugin_rpc,
+                                              self.root_helper)
+        # Initialize iteration counter
+        self.iter_num = 0
+
+    def _check_ovs_version(self):
+        if p_const.TYPE_VXLAN in self.tunnel_types:
+            try:
+                ovs_lib.check_ovs_vxlan_version(self.root_helper)
+            except SystemError:
+                LOG.exception(_("Agent terminated"))
+                raise SystemExit(1)
+
+    def _report_state(self):
+        # How many devices are likely used by a VM
+        self.agent_state.get('configurations')['devices'] = (
+            self.int_br_device_count)
+        try:
+            self.state_rpc.report_state(self.context,
+                                        self.agent_state)
+            self.agent_state.pop('start_flag', None)
+        except Exception:
+            LOG.exception(_("Failed reporting state!"))
+
+    def ryu_send_msg(self, msg):
+        result = ryu_api.send_msg(self.ryuapp, msg)
+        LOG.info(_("ryu send_msg() result: %s"), result)
+
+    def setup_rpc(self):
+        mac = self.int_br.get_local_port_mac()
+        self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
+        self.topic = topics.AGENT
+        self.plugin_rpc = OFAPluginApi(topics.PLUGIN)
+        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
+
+        # RPC network init
+        self.context = context.get_admin_context_without_session()
+        # Handle updates from service
+        self.dispatcher = self.create_rpc_dispatcher()
+        # Define the listening consumers for the agent
+        consumers = [[topics.PORT, topics.UPDATE],
+                     [topics.NETWORK, topics.DELETE],
+                     [constants.TUNNEL, topics.UPDATE],
+                     [topics.SECURITY_GROUP, topics.UPDATE]]
+        self.connection = agent_rpc.create_consumers(self.dispatcher,
+                                                     self.topic,
+                                                     consumers)
+        report_interval = cfg.CONF.AGENT.report_interval
+        if report_interval:
+            heartbeat = loopingcall.FixedIntervalLoopingCall(
+                self._report_state)
+            heartbeat.start(interval=report_interval)
+
+    def get_net_uuid(self, vif_id):
+        for network_id, vlan_mapping in self.local_vlan_map.iteritems():
+            if vif_id in vlan_mapping.vif_ports:
+                return network_id
+
+    def network_delete(self, context, **kwargs):
+        network_id = kwargs.get('network_id')
+        LOG.debug(_("network_delete received network %s"), network_id)
+        # The network may not be defined on this agent
+        lvm = self.local_vlan_map.get(network_id)
+        if lvm:
+            self.reclaim_local_vlan(network_id)
+        else:
+            LOG.debug(_("Network %s not used on agent."), network_id)
+
+    def port_update(self, context, **kwargs):
+        port = kwargs.get('port')
+        LOG.debug(_("port_update received port %s"), port['id'])
+        # Validate that port is on OVS
+        vif_port = self.int_br.get_vif_port_by_id(port['id'])
+        if not vif_port:
+            return
+
+        if ext_sg.SECURITYGROUPS in port:
+            self.sg_agent.refresh_firewall()
+        network_type = kwargs.get('network_type')
+        segmentation_id = kwargs.get('segmentation_id')
+        physical_network = kwargs.get('physical_network')
+        self.treat_vif_port(vif_port, port['id'], port['network_id'],
+                            network_type, physical_network,
+                            segmentation_id, port['admin_state_up'])
+        try:
+            if port['admin_state_up']:
+                # update plugin about port status
+                self.plugin_rpc.update_device_up(self.context, port['id'],
+                                                 self.agent_id,
+                                                 cfg.CONF.host)
+            else:
+                # update plugin about port status
+                self.plugin_rpc.update_device_down(self.context, port['id'],
+                                                   self.agent_id,
+                                                   cfg.CONF.host)
+        except rpc_common.Timeout:
+            LOG.error(_("RPC timeout while updating port %s"), port['id'])
+
+    def tunnel_update(self, context, **kwargs):
+        LOG.debug(_("tunnel_update received"))
+        if not self.enable_tunneling:
+            return
+        tunnel_ip = kwargs.get('tunnel_ip')
+        tunnel_id = kwargs.get('tunnel_id', tunnel_ip)
+        tunnel_type = kwargs.get('tunnel_type')
+        if not tunnel_type:
+            LOG.error(_("No tunnel_type specified, cannot create tunnels"))
+            return
+        if tunnel_type not in self.tunnel_types:
+            LOG.error(_("tunnel_type %s not supported by agent"), tunnel_type)
+            return
+        if tunnel_ip == self.local_ip:
+            return
+        tun_name = '%s-%s' % (tunnel_type, tunnel_id)
+        self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
+
+    def create_rpc_dispatcher(self):
+        """Get the rpc dispatcher for this manager.
+
+        If a manager would like to set an rpc API version, or support more than
+        one class as the target of rpc messages, override this method.
+        """
+        return dispatcher.RpcDispatcher([self])
+
+    def _provision_local_vlan_outbound_for_tunnel(self, lvid,
+                                                  segmentation_id, ofports):
+        br = self.tun_br
+        match = br.ofparser.OFPMatch(
+            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)
+        actions = [br.ofparser.OFPActionPopVlan(),
+                   br.ofparser.OFPActionSetField(
+                       tunnel_id=int(segmentation_id))]
+        for ofport in ofports:
+            actions.append(br.ofparser.OFPActionOutput(ofport, 0))
+        instructions = [br.ofparser.OFPInstructionActions(
+                        ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]
+        msg = br.ofparser.OFPFlowMod(
+            br.datapath,
+            table_id=constants.FLOOD_TO_TUN,
+            priority=1,
+            match=match, instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _provision_local_vlan_inbound_for_tunnel(self, lvid, network_type,
+                                                 segmentation_id):
+        br = self.tun_br
+        match = br.ofparser.OFPMatch(
+            tunnel_id=int(segmentation_id))
+        actions = [br.ofparser.OFPActionSetField(
+            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)]
+        instructions = [
+            br.ofparser.OFPInstructionActions(
+                ryu_ofp13.OFPIT_APPLY_ACTIONS, actions),
+            br.ofparser.OFPInstructionGotoTable(
+                table_id=constants.LEARN_FROM_TUN)]
+        msg = br.ofparser.OFPFlowMod(
+            br.datapath,
+            table_id=constants.TUN_TABLE[network_type],
+            priority=1,
+            match=match,
+            instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _local_vlan_for_tunnel(self, lvid, network_type, segmentation_id):
+        ofports = [int(ofport) for ofport in
+                   self.tun_br_ofports[network_type].values()]
+        if ofports:
+            self._provision_local_vlan_outbound_for_tunnel(
+                lvid, segmentation_id, ofports)
+        self._provision_local_vlan_inbound_for_tunnel(lvid, network_type,
+                                                      segmentation_id)
+
+    def _provision_local_vlan_outbound(self, br, lvid, actions,
+                                       physical_network):
+        match = br.ofparser.OFPMatch(
+            in_port=int(self.phys_ofports[physical_network]),
+            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)
+        instructions = [br.ofparser.OFPInstructionActions(
+            ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     priority=4,
+                                     match=match,
+                                     instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _provision_local_vlan_inbound(self, lvid, vlan_vid, physical_network):
+        match = self.int_br.ofparser.OFPMatch(
+            in_port=int(self.int_ofports[physical_network]),
+            vlan_vid=vlan_vid)
+        actions = [self.int_br.ofparser.OFPActionSetField(
+            vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT),
+            self.int_br.ofparser.OFPActionOutput(
+                ryu_ofp13.OFPP_NORMAL, 0)]
+        instructions = [self.int_br.ofparser.OFPInstructionActions(
+            ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]
+        msg = self.int_br.ofparser.OFPFlowMod(
+            self.int_br.datapath,
+            priority=3,
+            match=match,
+            instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _local_vlan_for_flat(self, lvid, physical_network):
+        br = self.phys_brs[physical_network]
+        actions = [br.ofparser.OFPActionPopVlan(),
+                   br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]
+        self._provision_local_vlan_outbound(
+            br, lvid, actions, physical_network)
+        self._provision_local_vlan_inbound(lvid, 0xffff, physical_network)
+
+    def _local_vlan_for_vlan(self, lvid, physical_network, segmentation_id):
+        br = self.phys_brs[physical_network]
+        actions = [br.ofparser.OFPActionSetField(
+            vlan_vid=int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT),
+            br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]
+        self._provision_local_vlan_outbound(
+            br, lvid, actions, physical_network)
+        self._provision_local_vlan_inbound(
+            lvid, int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT,
+            physical_network)
+
+    def provision_local_vlan(self, net_uuid, network_type, physical_network,
+                             segmentation_id):
+        """Provisions a local VLAN.
+
+        :param net_uuid: the uuid of the network associated with this vlan.
+        :param network_type: the network type ('gre', 'vxlan', 'vlan', 'flat',
+                                               'local')
+        :param physical_network: the physical network for 'vlan' or 'flat'
+        :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
+        """
+
+        if not self.available_local_vlans:
+            LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
+            return
+        lvid = self.available_local_vlans.pop()
+        LOG.info(_("Assigning %(vlan_id)s as local vlan for "
+                   "net-id=%(net_uuid)s"),
+                 {'vlan_id': lvid, 'net_uuid': net_uuid})
+        self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,
+                                                         physical_network,
+                                                         segmentation_id)
+
+        if network_type in constants.TUNNEL_NETWORK_TYPES:
+            if self.enable_tunneling:
+                self._local_vlan_for_tunnel(lvid, network_type,
+                                            segmentation_id)
+            else:
+                LOG.error(_("Cannot provision %(network_type)s network for "
+                          "net-id=%(net_uuid)s - tunneling disabled"),
+                          {'network_type': network_type,
+                           'net_uuid': net_uuid})
+        elif network_type == p_const.TYPE_FLAT:
+            if physical_network in self.phys_brs:
+                self._local_vlan_for_flat(lvid, physical_network)
+            else:
+                LOG.error(_("Cannot provision flat network for "
+                            "net-id=%(net_uuid)s - no bridge for "
+                            "physical_network %(physical_network)s"),
+                          {'net_uuid': net_uuid,
+                           'physical_network': physical_network})
+        elif network_type == p_const.TYPE_VLAN:
+            if physical_network in self.phys_brs:
+                self._local_vlan_for_vlan(lvid, physical_network,
+                                          segmentation_id)
+            else:
+                LOG.error(_("Cannot provision VLAN network for "
+                            "net-id=%(net_uuid)s - no bridge for "
+                            "physical_network %(physical_network)s"),
+                          {'net_uuid': net_uuid,
+                           'physical_network': physical_network})
+        elif network_type == p_const.TYPE_LOCAL:
+            # no flows needed for local networks
+            pass
+        else:
+            LOG.error(_("Cannot provision unknown network type "
+                        "%(network_type)s for net-id=%(net_uuid)s"),
+                      {'network_type': network_type,
+                       'net_uuid': net_uuid})
+
+    def _reclaim_local_vlan_outbound(self, lvm):
+        br = self.phys_brs[lvm.physical_network]
+        match = br.ofparser.OFPMatch(
+            in_port=self.phys_ofports[lvm.physical_network],
+            vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT)
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     table_id=ryu_ofp13.OFPTT_ALL,
+                                     command=ryu_ofp13.OFPFC_DELETE,
+                                     out_group=ryu_ofp13.OFPG_ANY,
+                                     out_port=ryu_ofp13.OFPP_ANY,
+                                     match=match)
+        self.ryu_send_msg(msg)
+
+    def _reclaim_local_vlan_inbound(self, lvm, vlan_vid):
+        br = self.int_br
+        match = br.ofparser.OFPMatch(
+            in_port=self.int_ofports[lvm.physical_network],
+            vlan_vid=vlan_vid)
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     table_id=ryu_ofp13.OFPTT_ALL,
+                                     command=ryu_ofp13.OFPFC_DELETE,
+                                     out_group=ryu_ofp13.OFPG_ANY,
+                                     out_port=ryu_ofp13.OFPP_ANY,
+                                     match=match)
+        self.ryu_send_msg(msg)
+
+    def reclaim_local_vlan(self, net_uuid):
+        """Reclaim a local VLAN.
+
+        :param net_uuid: the network uuid associated with this vlan.
+        :param lvm: a LocalVLANMapping object that tracks (vlan, lsw_id,
+            vif_ids) mapping.
+        """
+        lvm = self.local_vlan_map.pop(net_uuid, None)
+        if lvm is None:
+            LOG.debug(_("Network %s not used on agent."), net_uuid)
+            return
+
+        LOG.info(_("Reclaiming vlan = %(vlan_id)s from net-id = %(net_uuid)s"),
+                 {'vlan_id': lvm.vlan,
+                  'net_uuid': net_uuid})
+
+        if lvm.network_type in constants.TUNNEL_NETWORK_TYPES:
+            if self.enable_tunneling:
+                match = self.tun_br.ofparser.OFPMatch(
+                    tunnel_id=int(lvm.segmentation_id))
+                msg = self.tun_br.ofparser.OFPFlowMod(
+                    self.tun_br.datapath,
+                    table_id=constants.TUN_TABLE[lvm.network_type],
+                    command=ryu_ofp13.OFPFC_DELETE,
+                    out_group=ryu_ofp13.OFPG_ANY,
+                    out_port=ryu_ofp13.OFPP_ANY,
+                    match=match)
+                self.ryu_send_msg(msg)
+                match = self.tun_br.ofparser.OFPMatch(
+                    vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT)
+                msg = self.tun_br.ofparser.OFPFlowMod(
+                    self.tun_br.datapath,
+                    table_id=ryu_ofp13.OFPTT_ALL,
+                    command=ryu_ofp13.OFPFC_DELETE,
+                    out_group=ryu_ofp13.OFPG_ANY,
+                    out_port=ryu_ofp13.OFPP_ANY,
+                    match=match)
+                self.ryu_send_msg(msg)
+        elif lvm.network_type == p_const.TYPE_FLAT:
+            if lvm.physical_network in self.phys_brs:
+                self._reclaim_local_vlan_outbound(lvm)
+                self._reclaim_local_vlan_inbound(lvm, 0xffff)
+        elif lvm.network_type == p_const.TYPE_VLAN:
+            if lvm.physical_network in self.phys_brs:
+                self._reclaim_local_vlan_outbound(lvm)
+                self._reclaim_local_vlan_inbound(
+                    lvm, lvm.segmentation_id | ryu_ofp13.OFPVID_PRESENT)
+        elif lvm.network_type == p_const.TYPE_LOCAL:
+            # no flows needed for local networks
+            pass
+        else:
+            LOG.error(_("Cannot reclaim unknown network type "
+                        "%(network_type)s for net-id=%(net_uuid)s"),
+                      {'network_type': lvm.network_type,
+                       'net_uuid': net_uuid})
+
+        self.available_local_vlans.add(lvm.vlan)
+
+    def port_bound(self, port, net_uuid,
+                   network_type, physical_network, segmentation_id):
+        """Bind port to net_uuid/lsw_id and install flow for inbound traffic
+        to vm.
+
+        :param port: a ovs_lib.VifPort object.
+        :param net_uuid: the net_uuid this port is to be associated with.
+        :param network_type: the network type ('gre', 'vlan', 'flat', 'local')
+        :param physical_network: the physical network for 'vlan' or 'flat'
+        :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
+        """
+        if net_uuid not in self.local_vlan_map:
+            self.provision_local_vlan(net_uuid, network_type,
+                                      physical_network, segmentation_id)
+        lvm = self.local_vlan_map[net_uuid]
+        lvm.vif_ports[port.vif_id] = port
+
+        self.int_br.set_db_attribute("Port", port.port_name, "tag",
+                                     str(lvm.vlan))
+        if int(port.ofport) != -1:
+            match = self.int_br.ofparser.OFPMatch(in_port=port.ofport)
+            msg = self.int_br.ofparser.OFPFlowMod(
+                self.int_br.datapath,
+                table_id=ryu_ofp13.OFPTT_ALL,
+                command=ryu_ofp13.OFPFC_DELETE,
+                out_group=ryu_ofp13.OFPG_ANY,
+                out_port=ryu_ofp13.OFPP_ANY,
+                match=match)
+            self.ryu_send_msg(msg)
+
+    def port_unbound(self, vif_id, net_uuid=None):
+        """Unbind port.
+
+        Removes corresponding local vlan mapping object if this is its last
+        VIF.
+
+        :param vif_id: the id of the vif
+        :param net_uuid: the net_uuid this port is associated with.
+        """
+        net_uuid = net_uuid or self.get_net_uuid(vif_id)
+
+        if not self.local_vlan_map.get(net_uuid):
+            LOG.info(_('port_unbound() net_uuid %s not in local_vlan_map'),
+                     net_uuid)
+            return
+
+        lvm = self.local_vlan_map[net_uuid]
+        lvm.vif_ports.pop(vif_id, None)
+
+        if not lvm.vif_ports:
+            self.reclaim_local_vlan(net_uuid)
+
+    def port_dead(self, port):
+        """Once a port has no binding, put it on the "dead vlan".
+
+        :param port: a ovs_lib.VifPort object.
+        """
+        self.int_br.set_db_attribute("Port", port.port_name, "tag",
+                                     DEAD_VLAN_TAG)
+        match = self.int_br.ofparser.OFPMatch(in_port=int(port.ofport))
+        msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
+                                              priority=2, match=match)
+        self.ryu_send_msg(msg)
+
+    def setup_integration_br(self):
+        """Setup the integration bridge.
+
+        Create patch ports and remove all existing flows.
+
+        :param bridge_name: the name of the integration bridge.
+        :returns: the integration bridge
+        """
+        self.int_br.setup_ofp()
+        self.int_br.delete_port(cfg.CONF.OVS.int_peer_patch_port)
+        msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
+                                              table_id=ryu_ofp13.OFPTT_ALL,
+                                              command=ryu_ofp13.OFPFC_DELETE,
+                                              out_group=ryu_ofp13.OFPG_ANY,
+                                              out_port=ryu_ofp13.OFPP_ANY)
+        self.ryu_send_msg(msg)
+        # switch all traffic using L2 learning
+        actions = [self.int_br.ofparser.OFPActionOutput(
+            ryu_ofp13.OFPP_NORMAL, 0)]
+        instructions = [self.int_br.ofparser.OFPInstructionActions(
+            ryu_ofp13.OFPIT_APPLY_ACTIONS,
+            actions)]
+        msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
+                                              priority=1,
+                                              instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def setup_ancillary_bridges(self, integ_br, tun_br):
+        """Setup ancillary bridges - for example br-ex."""
+        ovs_bridges = set(ovs_lib.get_bridges(self.root_helper))
+        # Remove all known bridges
+        ovs_bridges.remove(integ_br)
+        if self.enable_tunneling:
+            ovs_bridges.remove(tun_br)
+        br_names = [self.phys_brs[physical_network].br_name for
+                    physical_network in self.phys_brs]
+        ovs_bridges.difference_update(br_names)
+        # Filter list of bridges to those that have external
+        # bridge-id's configured
+        br_names = [
+            bridge for bridge in ovs_bridges
+            if bridge != ovs_lib.get_bridge_external_bridge_id(
+                self.root_helper, bridge)
+        ]
+        ovs_bridges.difference_update(br_names)
+        ancillary_bridges = []
+        for bridge in ovs_bridges:
+            br = OVSBridge(bridge, self.root_helper, self.ryuapp)
+            ancillary_bridges.append(br)
+        LOG.info(_('ancillary bridge list: %s.'), ancillary_bridges)
+        return ancillary_bridges
+
+    def _tun_br_sort_incoming_traffic_depend_in_port(self, br):
+        match = br.ofparser.OFPMatch(
+            in_port=int(self.patch_int_ofport))
+        instructions = [br.ofparser.OFPInstructionGotoTable(
+            table_id=constants.PATCH_LV_TO_TUN)]
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     priority=1,
+                                     match=match,
+                                     instructions=instructions)
+        self.ryu_send_msg(msg)
+        msg = br.ofparser.OFPFlowMod(br.datapath, priority=0)
+        self.ryu_send_msg(msg)
+
+    def _tun_br_goto_table_ucast_unicast(self, br):
+        match = br.ofparser.OFPMatch(eth_dst=('00:00:00:00:00:00',
+                                              '01:00:00:00:00:00'))
+        instructions = [br.ofparser.OFPInstructionGotoTable(
+            table_id=constants.UCAST_TO_TUN)]
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     table_id=constants.PATCH_LV_TO_TUN,
+                                     match=match,
+                                     instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _tun_br_goto_table_flood_broad_multi_cast(self, br):
+        match = br.ofparser.OFPMatch(eth_dst=('01:00:00:00:00:00',
+                                              '01:00:00:00:00:00'))
+        instructions = [br.ofparser.OFPInstructionGotoTable(
+            table_id=constants.FLOOD_TO_TUN)]
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     table_id=constants.PATCH_LV_TO_TUN,
+                                     match=match,
+                                     instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _tun_br_set_table_tun_by_tunnel_type(self, br):
+        for tunnel_type in constants.TUNNEL_NETWORK_TYPES:
+            msg = br.ofparser.OFPFlowMod(
+                br.datapath,
+                table_id=constants.TUN_TABLE[tunnel_type],
+                priority=0)
+            self.ryu_send_msg(msg)
+
+    def _tun_br_output_patch_int(self, br):
+        actions = [br.ofparser.OFPActionOutput(
+            int(self.patch_int_ofport), 0)]
+        instructions = [br.ofparser.OFPInstructionActions(
+            ryu_ofp13.OFPIT_APPLY_ACTIONS,
+            actions)]
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     table_id=constants.LEARN_FROM_TUN,
+                                     priority=1,
+                                     instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _tun_br_goto_table_flood_unknown_unicast(self, br):
+        instructions = [br.ofparser.OFPInstructionGotoTable(
+            table_id=constants.FLOOD_TO_TUN)]
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     table_id=constants.UCAST_TO_TUN,
+                                     priority=0,
+                                     instructions=instructions)
+        self.ryu_send_msg(msg)
+
+    def _tun_br_default_drop(self, br):
+        msg = br.ofparser.OFPFlowMod(
+            br.datapath,
+            table_id=constants.FLOOD_TO_TUN,
+            priority=0)
+        self.ryu_send_msg(msg)
+
+    def setup_tunnel_br(self, tun_br):
+        """Setup the tunnel bridge.
+
+        Creates tunnel bridge, and links it to the integration bridge
+        using a patch port.
+
+        :param tun_br: the name of the tunnel bridge.
+        """
+        self.tun_br = OVSBridge(tun_br, self.root_helper, self.ryuapp)
+        self.tun_br.reset_bridge()
+        self.tun_br.setup_ofp()
+        self.patch_tun_ofport = self.int_br.add_patch_port(
+            cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
+        self.patch_int_ofport = self.tun_br.add_patch_port(
+            cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port)
+        if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0:
+            LOG.error(_("Failed to create OVS patch port. Cannot have "
+                        "tunneling enabled on this agent, since this version "
+                        "of OVS does not support tunnels or patch ports. "
+                        "Agent terminated!"))
+            raise SystemExit(1)
+        msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
+                                              table_id=ryu_ofp13.OFPTT_ALL,
+                                              command=ryu_ofp13.OFPFC_DELETE,
+                                              out_group=ryu_ofp13.OFPG_ANY,
+                                              out_port=ryu_ofp13.OFPP_ANY)
+        self.ryu_send_msg(msg)
+
+        self._tun_br_sort_incoming_traffic_depend_in_port(self.tun_br)
+        self._tun_br_goto_table_ucast_unicast(self.tun_br)
+        self._tun_br_goto_table_flood_broad_multi_cast(self.tun_br)
+        self._tun_br_set_table_tun_by_tunnel_type(self.tun_br)
+        self._tun_br_output_patch_int(self.tun_br)
+        self._tun_br_goto_table_flood_unknown_unicast(self.tun_br)
+        self._tun_br_default_drop(self.tun_br)
+
+    def _phys_br_prepare_create_veth(self, br, int_veth_name, phys_veth_name):
+        self.int_br.delete_port(int_veth_name)
+        br.delete_port(phys_veth_name)
+        if ip_lib.device_exists(int_veth_name, self.root_helper):
+            ip_lib.IPDevice(int_veth_name, self.root_helper).link.delete()
+            # Give udev a chance to process its rules here, to avoid
+            # race conditions between commands launched by udev rules
+            # and the subsequent call to ip_wrapper.add_veth
+            utils.execute(['/sbin/udevadm', 'settle', '--timeout=10'])
+
+    def _phys_br_create_veth(self, br, int_veth_name,
+                             phys_veth_name, physical_network, ip_wrapper):
+        int_veth, phys_veth = ip_wrapper.add_veth(int_veth_name,
+                                                  phys_veth_name)
+        self.int_ofports[physical_network] = self.int_br.add_port(int_veth)
+        self.phys_ofports[physical_network] = br.add_port(phys_veth)
+        return (int_veth, phys_veth)
+
+    def _phys_br_block_untranslated_traffic(self, br, physical_network):
+        match = br.ofparser.OFPMatch(in_port=int(
+            self.int_ofports[physical_network]))
+        msg = br.ofparser.OFPFlowMod(self.int_br.datapath,
+                                     priority=2, match=match)
+        self.ryu_send_msg(msg)
+        match = br.ofparser.OFPMatch(in_port=int(
+            self.phys_ofports[physical_network]))
+        msg = br.ofparser.OFPFlowMod(br.datapath, priority=2, match=match)
+        self.ryu_send_msg(msg)
+
+    def _phys_br_enable_veth_to_pass_traffic(self, int_veth, phys_veth):
+        # enable veth to pass traffic
+        int_veth.link.set_up()
+        phys_veth.link.set_up()
+
+        if self.veth_mtu:
+            # set up mtu size for veth interfaces
+            int_veth.link.set_mtu(self.veth_mtu)
+            phys_veth.link.set_mtu(self.veth_mtu)
+
+    def _phys_br_patch_physical_bridge_with_integration_bridge(
+            self, br, physical_network, bridge, ip_wrapper):
+        int_veth_name = constants.VETH_INTEGRATION_PREFIX + bridge
+        phys_veth_name = constants.VETH_PHYSICAL_PREFIX + bridge
+        self._phys_br_prepare_create_veth(br, int_veth_name, phys_veth_name)
+        int_veth, phys_veth = self._phys_br_create_veth(br, int_veth_name,
+                                                        phys_veth_name,
+                                                        physical_network,
+                                                        ip_wrapper)
+        self._phys_br_block_untranslated_traffic(br, physical_network)
+        self._phys_br_enable_veth_to_pass_traffic(int_veth, phys_veth)
+
+    def setup_physical_bridges(self, bridge_mappings):
+        """Setup the physical network bridges.
+
+        Creates physical network bridges and links them to the
+        integration bridge using veths.
+
+        :param bridge_mappings: map physical network names to bridge names.
+        """
+        self.phys_brs = {}
+        self.int_ofports = {}
+        self.phys_ofports = {}
+        ip_wrapper = ip_lib.IPWrapper(self.root_helper)
+        for physical_network, bridge in bridge_mappings.iteritems():
+            LOG.info(_("Mapping physical network %(physical_network)s to "
+                       "bridge %(bridge)s"),
+                     {'physical_network': physical_network,
+                      'bridge': bridge})
+            # setup physical bridge
+            if not ip_lib.device_exists(bridge, self.root_helper):
+                LOG.error(_("Bridge %(bridge)s for physical network "
+                            "%(physical_network)s does not exist. Agent "
+                            "terminated!"),
+                          {'physical_network': physical_network,
+                           'bridge': bridge})
+                raise SystemExit(1)
+            br = OVSBridge(bridge, self.root_helper, self.ryuapp)
+            br.setup_ofp()
+            msg = br.ofparser.OFPFlowMod(br.datapath,
+                                         table_id=ryu_ofp13.OFPTT_ALL,
+                                         command=ryu_ofp13.OFPFC_DELETE,
+                                         out_group=ryu_ofp13.OFPG_ANY,
+                                         out_port=ryu_ofp13.OFPP_ANY)
+            self.ryu_send_msg(msg)
+            actions = [br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]
+            instructions = [br.ofparser.OFPInstructionActions(
+                ryu_ofp13.OFPIT_APPLY_ACTIONS,
+                actions)]
+            msg = br.ofparser.OFPFlowMod(br.datapath,
+                                         priority=1,
+                                         instructions=instructions)
+            self.ryu_send_msg(msg)
+            self.phys_brs[physical_network] = br
+
+            self._phys_br_patch_physical_bridge_with_integration_bridge(
+                br, physical_network, bridge, ip_wrapper)
+
+    def update_ports(self, registered_ports):
+        ports = self.int_br.get_vif_port_set()
+        if ports == registered_ports:
+            return
+        self.int_br_device_count = len(ports)
+        added = ports - registered_ports
+        removed = registered_ports - ports
+        return {'current': ports,
+                'added': added,
+                'removed': removed}
+
+    def update_ancillary_ports(self, registered_ports):
+        ports = set()
+        for bridge in self.ancillary_brs:
+            ports |= bridge.get_vif_port_set()
+
+        if ports == registered_ports:
+            return
+        added = ports - registered_ports
+        removed = registered_ports - ports
+        return {'current': ports,
+                'added': added,
+                'removed': removed}
+
+    def treat_vif_port(self, vif_port, port_id, network_id, network_type,
+                       physical_network, segmentation_id, admin_state_up):
+        if vif_port:
+            if admin_state_up:
+                self.port_bound(vif_port, network_id, network_type,
+                                physical_network, segmentation_id)
+            else:
+                self.port_dead(vif_port)
+        else:
+            LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
+
+    def setup_tunnel_port(self, port_name, remote_ip, tunnel_type):
+        ofport = self.tun_br.add_tunnel_port(port_name,
+                                             remote_ip,
+                                             self.local_ip,
+                                             tunnel_type,
+                                             self.vxlan_udp_port)
+        ofport_int = -1
+        try:
+            ofport_int = int(ofport)
+        except (TypeError, ValueError):
+            LOG.exception(_("ofport should have a value that can be "
+                            "interpreted as an integer"))
+        if ofport_int < 0:
+            LOG.error(_("Failed to set-up %(type)s tunnel port to %(ip)s"),
+                      {'type': tunnel_type, 'ip': remote_ip})
+            return 0
+
+        self.tun_br_ofports[tunnel_type][remote_ip] = ofport
+        # Add flow in default table to resubmit to the right
+        # tunelling table (lvid will be set in the latter)
+        match = self.tun_br.ofparser.OFPMatch(in_port=int(ofport))
+        instructions = [self.tun_br.ofparser.OFPInstructionGotoTable(
+            table_id=constants.TUN_TABLE[tunnel_type])]
+        msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
+                                              priority=1,
+                                              match=match,
+                                              instructions=instructions)
+        self.ryu_send_msg(msg)
+
+        ofports = [int(p) for p in self.tun_br_ofports[tunnel_type].values()]
+        if ofports:
+            # Update flooding flows to include the new tunnel
+            for network_id, vlan_mapping in self.local_vlan_map.iteritems():
+                if vlan_mapping.network_type == tunnel_type:
+                    match = self.tun_br.ofparser.OFPMatch(
+                        vlan_vid=int(vlan_mapping.vlan) |
+                        ryu_ofp13.OFPVID_PRESENT)
+                    actions = [
+                        self.tun_br.ofparser.OFPActionPopVlan(),
+                        self.tun_br.ofparser.OFPActionSetField(
+                            tunnel_id=int(vlan_mapping.segmentation_id))]
+                    actions.extend(
+                        self.tun_br.ofparser.OFPActionOutput(p, 0)
+                        for p in ofports
+                    )
+                    instructions = [
+                        self.tun_br.ofparser.OFPInstructionActions(
+                            ryu_ofp13.OFPIT_APPLY_ACTIONS,
+                            actions)]
+                    msg = self.tun_br.ofparser.OFPFlowMod(
+                        self.tun_br.datapath,
+                        table_id=constants.FLOOD_TO_TUN,
+                        priority=1,
+                        match=match,
+                        instructions=instructions)
+                    self.ryu_send_msg(msg)
+        return ofport
+
+    def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
+        # Check if this tunnel port is still used
+        for lvm in self.local_vlan_map.values():
+            if tun_ofport in lvm.tun_ofports:
+                break
+        # If not, remove it
+        else:
+            for remote_ip, ofport in self.tun_br_ofports[tunnel_type].items():
+                if ofport == tun_ofport:
+                    port_name = '%s-%s' % (tunnel_type, remote_ip)
+                    self.tun_br.delete_port(port_name)
+                    self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
+
+    def treat_devices_added(self, devices):
+        resync = False
+        self.sg_agent.prepare_devices_filter(devices)
+        for device in devices:
+            LOG.info(_("Port %s added"), device)
+            try:
+                details = self.plugin_rpc.get_device_details(self.context,
+                                                             device,
+                                                             self.agent_id)
+            except Exception as e:
+                LOG.debug(_("Unable to get port details for "
+                            "%(device)s: %(e)s"),
+                          {'device': device, 'e': e})
+                resync = True
+                continue
+            port = self.int_br.get_vif_port_by_id(details['device'])
+            if 'port_id' in details:
+                LOG.info(_("Port %(device)s updated. Details: %(details)s"),
+                         {'device': device, 'details': details})
+                self.treat_vif_port(port, details['port_id'],
+                                    details['network_id'],
+                                    details['network_type'],
+                                    details['physical_network'],
+                                    details['segmentation_id'],
+                                    details['admin_state_up'])
+
+                # update plugin about port status
+                self.plugin_rpc.update_device_up(self.context,
+                                                 device,
+                                                 self.agent_id,
+                                                 cfg.CONF.host)
+            else:
+                LOG.debug(_("Device %s not defined on plugin"), device)
+                if (port and int(port.ofport) != -1):
+                    self.port_dead(port)
+        return resync
+
+    def treat_ancillary_devices_added(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info(_("Ancillary Port %s added"), device)
+            try:
+                self.plugin_rpc.get_device_details(self.context, device,
+                                                   self.agent_id)
+            except Exception as e:
+                LOG.debug(_("Unable to get port details for "
+                            "%(device)s: %(e)s"),
+                          {'device': device, 'e': e})
+                resync = True
+                continue
+
+            # update plugin about port status
+            self.plugin_rpc.update_device_up(self.context,
+                                             device,
+                                             self.agent_id,
+                                             cfg.CONF.host)
+        return resync
+
+    def treat_devices_removed(self, devices):
+        resync = False
+        self.sg_agent.remove_devices_filter(devices)
+        for device in devices:
+            LOG.info(_("Attachment %s removed"), device)
+            try:
+                self.plugin_rpc.update_device_down(self.context,
+                                                   device,
+                                                   self.agent_id,
+                                                   cfg.CONF.host)
+            except Exception as e:
+                LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
+                          {'device': device, 'e': e})
+                resync = True
+                continue
+            self.port_unbound(device)
+        return resync
+
+    def treat_ancillary_devices_removed(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info(_("Attachment %s removed"), device)
+            try:
+                details = self.plugin_rpc.update_device_down(self.context,
+                                                             device,
+                                                             self.agent_id,
+                                                             cfg.CONF.host)
+            except Exception as e:
+                LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
+                          {'device': device, 'e': e})
+                resync = True
+                continue
+            if details['exists']:
+                LOG.info(_("Port %s updated."), device)
+                # Nothing to do regarding local networking
+            else:
+                LOG.debug(_("Device %s not defined on plugin"), device)
+        return resync
+
+    def process_network_ports(self, port_info):
+        resync_add = False
+        resync_removed = False
+        if 'added' in port_info:
+            start = time.time()
+            resync_add = self.treat_devices_added(port_info['added'])
+            LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
+                        "treat_devices_added completed in %(elapsed).3f"),
+                      {'iter_num': self.iter_num,
+                       'elapsed': time.time() - start})
+        if 'removed' in port_info:
+            start = time.time()
+            resync_removed = self.treat_devices_removed(port_info['removed'])
+            LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
+                        "treat_devices_removed completed in %(elapsed).3f"),
+                      {'iter_num': self.iter_num,
+                       'elapsed': time.time() - start})
+        # If one of the above opertaions fails => resync with plugin
+        return (resync_add | resync_removed)
+
+    def process_ancillary_network_ports(self, port_info):
+        resync_add = False
+        resync_removed = False
+        if 'added' in port_info:
+            start = time.time()
+            resync_add = self.treat_ancillary_devices_added(port_info['added'])
+            LOG.debug(_("process_ancillary_network_ports - iteration: "
+                        "%(iter_num)d - treat_ancillary_devices_added "
+                        "completed in %(elapsed).3f"),
+                      {'iter_num': self.iter_num,
+                       'elapsed': time.time() - start})
+        if 'removed' in port_info:
+            start = time.time()
+            resync_removed = self.treat_ancillary_devices_removed(
+                port_info['removed'])
+            LOG.debug(_("process_ancillary_network_ports - iteration: "
+                        "%(iter_num)d - treat_ancillary_devices_removed "
+                        "completed in %(elapsed).3f"),
+                      {'iter_num': self.iter_num,
+                       'elapsed': time.time() - start})
+
+        # If one of the above opertaions fails => resync with plugin
+        return (resync_add | resync_removed)
+
+    def tunnel_sync(self):
+        resync = False
+        try:
+            for tunnel_type in self.tunnel_types:
+                details = self.plugin_rpc.tunnel_sync(self.context,
+                                                      self.local_ip,
+                                                      tunnel_type)
+                tunnels = details['tunnels']
+                for tunnel in tunnels:
+                    if self.local_ip != tunnel['ip_address']:
+                        tunnel_id = tunnel.get('id', tunnel['ip_address'])
+                        tun_name = '%s-%s' % (tunnel_type, tunnel_id)
+                        self.setup_tunnel_port(tun_name,
+                                               tunnel['ip_address'],
+                                               tunnel_type)
+        except Exception as e:
+            LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),
+                      {'local_ip': self.local_ip, 'e': e})
+            resync = True
+        return resync
+
+    def ovsdb_monitor_loop(self, polling_manager=None):
+        if not polling_manager:
+            polling_manager = polling.AlwaysPoll()
+
+        sync = True
+        ports = set()
+        ancillary_ports = set()
+        tunnel_sync = True
+        while True:
+            try:
+                start = time.time()
+                port_stats = {'regular': {'added': 0, 'removed': 0},
+                              'ancillary': {'added': 0, 'removed': 0}}
+                LOG.debug(_("Agent ovsdb_monitor_loop - "
+                          "iteration:%d started"),
+                          self.iter_num)
+                if sync:
+                    LOG.info(_("Agent out of sync with plugin!"))
+                    ports.clear()
+                    ancillary_ports.clear()
+                    sync = False
+                    polling_manager.force_polling()
+
+                # Notify the plugin of tunnel IP
+                if self.enable_tunneling and tunnel_sync:
+                    LOG.info(_("Agent tunnel out of sync with plugin!"))
+                    tunnel_sync = self.tunnel_sync()
+                if polling_manager.is_polling_required:
+                    LOG.debug(_("Agent ovsdb_monitor_loop - "
+                                "iteration:%(iter_num)d - "
+                                "starting polling. Elapsed:%(elapsed).3f"),
+                              {'iter_num': self.iter_num,
+                               'elapsed': time.time() - start})
+                    port_info = self.update_ports(ports)
+                    LOG.debug(_("Agent ovsdb_monitor_loop - "
+                                "iteration:%(iter_num)d - "
+                                "port information retrieved. "
+                                "Elapsed:%(elapsed).3f"),
+                              {'iter_num': self.iter_num,
+                               'elapsed': time.time() - start})
+                    # notify plugin about port deltas
+                    if port_info:
+                        LOG.debug(_("Agent loop has new devices!"))
+                        # If treat devices fails - must resync with plugin
+                        sync = self.process_network_ports(port_info)
+                        LOG.debug(_("Agent ovsdb_monitor_loop - "
+                                    "iteration:%(iter_num)d - "
+                                    "ports processed. Elapsed:%(elapsed).3f"),
+                                  {'iter_num': self.iter_num,
+                                   'elapsed': time.time() - start})
+                        ports = port_info['current']
+                        port_stats['regular']['added'] = (
+                            len(port_info.get('added', [])))
+                        port_stats['regular']['removed'] = (
+                            len(port_info.get('removed', [])))
+                    # Treat ancillary devices if they exist
+                    if self.ancillary_brs:
+                        port_info = self.update_ancillary_ports(
+                            ancillary_ports)
+                        LOG.debug(_("Agent ovsdb_monitor_loop - "
+                                    "iteration:%(iter_num)d - "
+                                    "ancillary port info retrieved. "
+                                    "Elapsed:%(elapsed).3f"),
+                                  {'iter_num': self.iter_num,
+                                   'elapsed': time.time() - start})
+
+                        if port_info:
+                            rc = self.process_ancillary_network_ports(
+                                port_info)
+                            LOG.debug(_("Agent ovsdb_monitor_loop - "
+                                        "iteration:"
+                                        "%(iter_num)d - ancillary ports "
+                                        "processed. Elapsed:%(elapsed).3f"),
+                                      {'iter_num': self.iter_num,
+                                       'elapsed': time.time() - start})
+                            ancillary_ports = port_info['current']
+                            port_stats['ancillary']['added'] = (
+                                len(port_info.get('added', [])))
+                            port_stats['ancillary']['removed'] = (
+                                len(port_info.get('removed', [])))
+                            sync = sync | rc
+
+                    polling_manager.polling_completed()
+
+            except Exception:
+                LOG.exception(_("Error in agent event loop"))
+                sync = True
+                tunnel_sync = True
+
+            # sleep till end of polling interval
+            elapsed = (time.time() - start)
+            LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "
+                        "completed. Processed ports statistics:"
+                        "%(port_stats)s. Elapsed:%(elapsed).3f"),
+                      {'iter_num': self.iter_num,
+                       'port_stats': port_stats,
+                       'elapsed': elapsed})
+            if (elapsed < self.polling_interval):
+                time.sleep(self.polling_interval - elapsed)
+            else:
+                LOG.debug(_("Loop iteration exceeded interval "
+                            "(%(polling_interval)s vs. %(elapsed)s)!"),
+                          {'polling_interval': self.polling_interval,
+                           'elapsed': elapsed})
+            self.iter_num = self.iter_num + 1
+
+    def daemon_loop(self):
+        with polling.get_polling_manager(
+                self.minimize_polling,
+                self.root_helper,
+                self.ovsdb_monitor_respawn_interval) as pm:
+
+            self.ovsdb_monitor_loop(polling_manager=pm)
+
+
+def create_agent_config_map(config):
+    """Create a map of agent config parameters.
+
+    :param config: an instance of cfg.CONF
+    :returns: a map of agent configuration parameters
+    """
+    try:
+        bridge_mappings = n_utils.parse_mappings(config.OVS.bridge_mappings)
+    except ValueError as e:
+        raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)
+
+    kwargs = dict(
+        integ_br=config.OVS.integration_bridge,
+        tun_br=config.OVS.tunnel_bridge,
+        local_ip=config.OVS.local_ip,
+        bridge_mappings=bridge_mappings,
+        root_helper=config.AGENT.root_helper,
+        polling_interval=config.AGENT.polling_interval,
+        minimize_polling=config.AGENT.minimize_polling,
+        tunnel_types=config.AGENT.tunnel_types,
+        veth_mtu=config.AGENT.veth_mtu,
+        l2_population=False,
+        ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
+    )
+
+    # If enable_tunneling is TRUE, set tunnel_type to default to GRE
+    if config.OVS.enable_tunneling and not kwargs['tunnel_types']:
+        kwargs['tunnel_types'] = [p_const.TYPE_GRE]
+
+    # Verify the tunnel_types specified are valid
+    for tun in kwargs['tunnel_types']:
+        if tun not in constants.TUNNEL_NETWORK_TYPES:
+            msg = _('Invalid tunnel type specificed: %s'), tun
+            raise ValueError(msg)
+        if not kwargs['local_ip']:
+            msg = _('Tunneling cannot be enabled without a valid local_ip.')
+            raise ValueError(msg)
+
+    return kwargs
diff --git a/neutron/plugins/ofagent/common/__init__.py b/neutron/plugins/ofagent/common/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/plugins/ofagent/common/config.py b/neutron/plugins/ofagent/common/config.py
new file mode 100644 (file)
index 0000000..759d3df
--- /dev/null
@@ -0,0 +1,33 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+from oslo.config import cfg
+
+from neutron.agent.common import config
+from neutron.plugins.openvswitch.common import config as ovs_config
+
+
+agent_opts = [
+    cfg.IntOpt('get_datapath_retry_times', default=60,
+               help=_("Number of seconds to retry acquiring "
+                      "an Open vSwitch datapath")),
+]
+
+
+cfg.CONF.register_opts(ovs_config.ovs_opts, 'OVS')
+cfg.CONF.register_opts(ovs_config.agent_opts, 'AGENT')
+cfg.CONF.register_opts(agent_opts, 'AGENT')
+config.register_agent_state_opts_helper(cfg.CONF)
+config.register_root_helper(cfg.CONF)
index d1fe5f96a8c3e16b496b213c1fe36a478c7d8b9f..03351fac793869e45a5029554eb05dd07f5325e1 100644 (file)
@@ -14,7 +14,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import distutils.version as dist_version
 import signal
 import sys
 import time
@@ -227,8 +226,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
 
     def _check_ovs_version(self):
         if p_const.TYPE_VXLAN in self.tunnel_types:
-            check_ovs_version(constants.MINIMUM_OVS_VXLAN_VERSION,
-                              self.root_helper)
+            try:
+                ovs_lib.check_ovs_vxlan_version(self.root_helper)
+            except SystemError:
+                LOG.exception(_("Agent terminated"))
+                raise SystemExit(1)
 
     def _report_state(self):
         # How many devices are likely used by a VM
@@ -1250,44 +1252,6 @@ def handle_sigterm(signum, frame):
     sys.exit(1)
 
 
-def check_ovs_version(min_required_version, root_helper):
-    LOG.debug(_("Checking OVS version for VXLAN support"))
-    installed_klm_version = ovs_lib.get_installed_ovs_klm_version()
-    installed_usr_version = ovs_lib.get_installed_ovs_usr_version(root_helper)
-    # First check the userspace version
-    if installed_usr_version:
-        if dist_version.StrictVersion(
-                installed_usr_version) < dist_version.StrictVersion(
-                min_required_version):
-            LOG.error(_('Failed userspace version check for Open '
-                        'vSwitch with VXLAN support. To use '
-                        'VXLAN tunnels with OVS, please ensure '
-                        'the OVS version is %s '
-                        'or newer!'), min_required_version)
-            sys.exit(1)
-        # Now check the kernel version
-        if installed_klm_version:
-            if dist_version.StrictVersion(
-                    installed_klm_version) < dist_version.StrictVersion(
-                    min_required_version):
-                LOG.error(_('Failed kernel version check for Open '
-                            'vSwitch with VXLAN support. To use '
-                            'VXLAN tunnels with OVS, please ensure '
-                            'the OVS version is %s or newer!'),
-                          min_required_version)
-                raise SystemExit(1)
-        else:
-            LOG.warning(_('Cannot determine kernel Open vSwitch version, '
-                          'please ensure your Open vSwitch kernel module '
-                          'is at least version %s to support VXLAN '
-                          'tunnels.'), min_required_version)
-    else:
-        LOG.warning(_('Unable to determine Open vSwitch version. Please '
-                      'ensure that its version is %s or newer to use VXLAN '
-                      'tunnels with OVS.'), min_required_version)
-        raise SystemExit(1)
-
-
 def create_agent_config_map(config):
     """Create a map of agent config parameters.
 
diff --git a/neutron/tests/unit/ml2/drivers/test_ofagent_mech.py b/neutron/tests/unit/ml2/drivers/test_ofagent_mech.py
new file mode 100644 (file)
index 0000000..63daf9e
--- /dev/null
@@ -0,0 +1,74 @@
+# Copyright (c) 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.common import constants
+from neutron.extensions import portbindings
+from neutron.plugins.ml2.drivers import mech_ofagent
+from neutron.tests.unit.ml2 import _test_mech_agent as base
+
+
+class OfagentMechanismBaseTestCase(base.AgentMechanismBaseTestCase):
+    VIF_TYPE = portbindings.VIF_TYPE_OVS
+    CAP_PORT_FILTER = True
+    AGENT_TYPE = constants.AGENT_TYPE_OFA
+
+    GOOD_MAPPINGS = {'fake_physical_network': 'fake_bridge'}
+    GOOD_TUNNEL_TYPES = ['gre', 'vxlan']
+    GOOD_CONFIGS = {'bridge_mappings': GOOD_MAPPINGS,
+                    'tunnel_types': GOOD_TUNNEL_TYPES}
+
+    BAD_MAPPINGS = {'wrong_physical_network': 'wrong_bridge'}
+    BAD_TUNNEL_TYPES = ['bad_tunnel_type']
+    BAD_CONFIGS = {'bridge_mappings': BAD_MAPPINGS,
+                   'tunnel_types': BAD_TUNNEL_TYPES}
+
+    AGENTS = [{'alive': True,
+               'configurations': GOOD_CONFIGS}]
+    AGENTS_DEAD = [{'alive': False,
+                    'configurations': GOOD_CONFIGS}]
+    AGENTS_BAD = [{'alive': False,
+                   'configurations': GOOD_CONFIGS},
+                  {'alive': True,
+                   'configurations': BAD_CONFIGS}]
+
+    def setUp(self):
+        super(OfagentMechanismBaseTestCase, self).setUp()
+        self.driver = mech_ofagent.OfagentMechanismDriver()
+        self.driver.initialize()
+
+
+class OfagentMechanismGenericTestCase(OfagentMechanismBaseTestCase,
+                                      base.AgentMechanismGenericTestCase):
+    pass
+
+
+class OfagentMechanismLocalTestCase(OfagentMechanismBaseTestCase,
+                                    base.AgentMechanismLocalTestCase):
+    pass
+
+
+class OfagentMechanismFlatTestCase(OfagentMechanismBaseTestCase,
+                                   base.AgentMechanismFlatTestCase):
+    pass
+
+
+class OfagentMechanismVlanTestCase(OfagentMechanismBaseTestCase,
+                                   base.AgentMechanismVlanTestCase):
+    pass
+
+
+class OfagentMechanismGreTestCase(OfagentMechanismBaseTestCase,
+                                  base.AgentMechanismGreTestCase):
+    pass
diff --git a/neutron/tests/unit/ofagent/__init__.py b/neutron/tests/unit/ofagent/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/tests/unit/ofagent/fake_oflib.py b/neutron/tests/unit/ofagent/fake_oflib.py
new file mode 100644 (file)
index 0000000..822c49c
--- /dev/null
@@ -0,0 +1,43 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+import mock
+
+
+def patch_fake_oflib_of():
+    ryu_mod = mock.Mock()
+    ryu_base_mod = ryu_mod.base
+    ryu_lib_mod = ryu_mod.lib
+    ryu_lib_hub = ryu_lib_mod.hub
+    ryu_ofproto_mod = ryu_mod.ofproto
+    ryu_ofproto_of13 = ryu_ofproto_mod.ofproto_v1_3
+    ryu_ofproto_of13.OFPTT_ALL = 0xff
+    ryu_ofproto_of13.OFPG_ANY = 0xffffffff
+    ryu_ofproto_of13.OFPP_ANY = 0xffffffff
+    ryu_ofproto_of13.OFPFC_ADD = 0
+    ryu_ofproto_of13.OFPFC_DELETE = 3
+    ryu_app_mod = ryu_mod.app
+    ryu_app_ofctl_mod = ryu_app_mod.ofctl
+    ryu_ofctl_api = ryu_app_ofctl_mod.api
+    return mock.patch.dict('sys.modules',
+                           {'ryu': ryu_mod,
+                            'ryu.base': ryu_base_mod,
+                            'ryu.lib': ryu_lib_mod,
+                            'ryu.lib.hub': ryu_lib_hub,
+                            'ryu.ofproto': ryu_ofproto_mod,
+                            'ryu.ofproto.ofproto_v1_3': ryu_ofproto_of13,
+                            'ryu.app': ryu_app_mod,
+                            'ryu.app.ofctl': ryu_app_ofctl_mod,
+                            'ryu.app.ofctl.api': ryu_ofctl_api})
diff --git a/neutron/tests/unit/ofagent/test_ofa_defaults.py b/neutron/tests/unit/ofagent/test_ofa_defaults.py
new file mode 100644 (file)
index 0000000..635070b
--- /dev/null
@@ -0,0 +1,25 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+from oslo.config import cfg
+
+from neutron.plugins.ofagent.common import config  # noqa
+from neutron.tests import base
+
+
+class ConfigurationTest(base.BaseTestCase):
+    """Configuration file Tests."""
+    def test_ml2_defaults(self):
+        self.assertEqual(60, cfg.CONF.AGENT.get_datapath_retry_times)
diff --git a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py
new file mode 100644 (file)
index 0000000..1c5b347
--- /dev/null
@@ -0,0 +1,660 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Based on test for openvswitch agent(test_ovs_neutron_agent.py).
+#
+# Copyright (c) 2012 OpenStack Foundation.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+import contextlib
+
+import mock
+from oslo.config import cfg
+import testtools
+
+from neutron.agent.linux import ip_lib
+from neutron.agent.linux import utils
+from neutron.openstack.common import importutils
+from neutron.openstack.common.rpc import common as rpc_common
+from neutron.plugins.common import constants as p_const
+from neutron.plugins.openvswitch.common import constants
+from neutron.tests import base
+from neutron.tests.unit.ofagent import fake_oflib
+
+
+NOTIFIER = ('neutron.plugins.ml2.rpc.AgentNotifierApi')
+
+
+class OFAAgentTestCase(base.BaseTestCase):
+
+    _AGENT_NAME = 'neutron.plugins.ofagent.agent.ofa_neutron_agent'
+
+    def setUp(self):
+        super(OFAAgentTestCase, self).setUp()
+        self.addCleanup(mock.patch.stopall)
+        self.fake_oflib_of = fake_oflib.patch_fake_oflib_of().start()
+        self.mod_agent = importutils.import_module(self._AGENT_NAME)
+        self.ryuapp = mock.Mock()
+        cfg.CONF.register_cli_opts([
+            cfg.StrOpt('ofp-listen-host', default='',
+                       help='openflow listen host'),
+            cfg.IntOpt('ofp-tcp-listen-port', default=6633,
+                       help='openflow tcp listen port')
+        ])
+        cfg.CONF.set_override('root_helper', 'fake_helper', group='AGENT')
+
+
+class CreateAgentConfigMap(OFAAgentTestCase):
+
+    def test_create_agent_config_map_succeeds(self):
+        self.assertTrue(self.mod_agent.create_agent_config_map(cfg.CONF))
+
+    def test_create_agent_config_map_fails_for_invalid_tunnel_config(self):
+        # An ip address is required for tunneling but there is no default,
+        # verify this for both gre and vxlan tunnels.
+        cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE],
+                              group='AGENT')
+        with testtools.ExpectedException(ValueError):
+            self.mod_agent.create_agent_config_map(cfg.CONF)
+        cfg.CONF.set_override('tunnel_types', [p_const.TYPE_VXLAN],
+                              group='AGENT')
+        with testtools.ExpectedException(ValueError):
+            self.mod_agent.create_agent_config_map(cfg.CONF)
+
+    def test_create_agent_config_map_enable_tunneling(self):
+        # Verify setting only enable_tunneling will default tunnel_type to GRE
+        cfg.CONF.set_override('tunnel_types', None, group='AGENT')
+        cfg.CONF.set_override('enable_tunneling', True, group='OVS')
+        cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
+        cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF)
+        self.assertEqual(cfgmap['tunnel_types'], [p_const.TYPE_GRE])
+
+    def test_create_agent_config_map_fails_no_local_ip(self):
+        # An ip address is required for tunneling but there is no default
+        cfg.CONF.set_override('enable_tunneling', True, group='OVS')
+        with testtools.ExpectedException(ValueError):
+            self.mod_agent.create_agent_config_map(cfg.CONF)
+
+    def test_create_agent_config_map_fails_for_invalid_tunnel_type(self):
+        cfg.CONF.set_override('tunnel_types', ['foobar'], group='AGENT')
+        with testtools.ExpectedException(ValueError):
+            self.mod_agent.create_agent_config_map(cfg.CONF)
+
+    def test_create_agent_config_map_multiple_tunnel_types(self):
+        cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
+        cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE,
+                              p_const.TYPE_VXLAN], group='AGENT')
+        cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF)
+        self.assertEqual(cfgmap['tunnel_types'],
+                         [p_const.TYPE_GRE, p_const.TYPE_VXLAN])
+
+
+class TestOFANeutronAgentOVSBridge(OFAAgentTestCase):
+
+    def setUp(self):
+        super(TestOFANeutronAgentOVSBridge, self).setUp()
+        self.br_name = 'bridge1'
+        self.root_helper = 'fake_helper'
+        self.ovs = self.mod_agent.OVSBridge(
+            self.br_name, self.root_helper, self.ryuapp)
+
+    def test_find_datapath_id(self):
+        with mock.patch.object(self.ovs, 'get_datapath_id',
+                               return_value='12345'):
+            self.ovs.find_datapath_id()
+        self.assertEqual(self.ovs.datapath_id, '12345')
+
+    def _fake_get_datapath(self, app, datapath_id):
+        if self.ovs.retry_count >= 2:
+            datapath = mock.Mock()
+            datapath.ofproto_parser = mock.Mock()
+            return datapath
+        self.ovs.retry_count += 1
+        return None
+
+    def test_get_datapath_normal(self):
+        self.ovs.retry_count = 0
+        with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
+                               new=self._fake_get_datapath):
+            self.ovs.datapath_id = '0x64'
+            self.ovs.get_datapath(retry_max=4)
+        self.assertEqual(self.ovs.retry_count, 2)
+
+    def test_get_datapath_retry_out_by_default_time(self):
+        cfg.CONF.set_override('get_datapath_retry_times', 3, group='AGENT')
+        with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
+                               return_value=None) as mock_get_datapath:
+            with testtools.ExpectedException(SystemExit):
+                self.ovs.datapath_id = '0x64'
+                self.ovs.get_datapath(retry_max=3)
+        self.assertEqual(mock_get_datapath.call_count, 3)
+
+    def test_get_datapath_retry_out_by_specified_time(self):
+        with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
+                               return_value=None) as mock_get_datapath:
+            with testtools.ExpectedException(SystemExit):
+                self.ovs.datapath_id = '0x64'
+                self.ovs.get_datapath(retry_max=2)
+        self.assertEqual(mock_get_datapath.call_count, 2)
+
+    def test_setup_ofp_default_par(self):
+        with contextlib.nested(
+            mock.patch.object(self.ovs, 'set_protocols'),
+            mock.patch.object(self.ovs, 'set_controller'),
+            mock.patch.object(self.ovs, 'find_datapath_id'),
+            mock.patch.object(self.ovs, 'get_datapath'),
+        ) as (mock_set_protocols, mock_set_controller,
+              mock_find_datapath_id, mock_get_datapath):
+            self.ovs.setup_ofp()
+        mock_set_protocols.assert_called_with('OpenFlow13')
+        mock_set_controller.assert_called_with(['tcp:127.0.0.1:6633'])
+        mock_get_datapath.assert_called_with(
+            cfg.CONF.AGENT.get_datapath_retry_times)
+        self.assertEqual(mock_find_datapath_id.call_count, 1)
+
+    def test_setup_ofp_specify_par(self):
+        controller_names = ['tcp:192.168.10.10:1234', 'tcp:172.17.16.20:5555']
+        with contextlib.nested(
+            mock.patch.object(self.ovs, 'set_protocols'),
+            mock.patch.object(self.ovs, 'set_controller'),
+            mock.patch.object(self.ovs, 'find_datapath_id'),
+            mock.patch.object(self.ovs, 'get_datapath'),
+        ) as (mock_set_protocols, mock_set_controller,
+              mock_find_datapath_id, mock_get_datapath):
+            self.ovs.setup_ofp(controller_names=controller_names,
+                               protocols='OpenFlow133',
+                               retry_max=11)
+        mock_set_protocols.assert_called_with('OpenFlow133')
+        mock_set_controller.assert_called_with(controller_names)
+        mock_get_datapath.assert_called_with(11)
+        self.assertEqual(mock_find_datapath_id.call_count, 1)
+
+    def test_setup_ofp_with_except(self):
+        with contextlib.nested(
+            mock.patch.object(self.ovs, 'set_protocols',
+                              side_effect=RuntimeError),
+            mock.patch.object(self.ovs, 'set_controller'),
+            mock.patch.object(self.ovs, 'find_datapath_id'),
+            mock.patch.object(self.ovs, 'get_datapath'),
+        ) as (mock_set_protocols, mock_set_controller,
+              mock_find_datapath_id, mock_get_datapath):
+            with testtools.ExpectedException(SystemExit):
+                self.ovs.setup_ofp()
+
+
+class TestOFANeutronAgent(OFAAgentTestCase):
+
+    def setUp(self):
+        super(TestOFANeutronAgent, self).setUp()
+        notifier_p = mock.patch(NOTIFIER)
+        notifier_cls = notifier_p.start()
+        self.notifier = mock.Mock()
+        notifier_cls.return_value = self.notifier
+        # Avoid rpc initialization for unit tests
+        cfg.CONF.set_override('rpc_backend',
+                              'neutron.openstack.common.rpc.impl_fake')
+        kwargs = self.mod_agent.create_agent_config_map(cfg.CONF)
+
+        class MockFixedIntervalLoopingCall(object):
+            def __init__(self, f):
+                self.f = f
+
+            def start(self, interval=0):
+                self.f()
+
+        with contextlib.nested(
+            mock.patch.object(self.mod_agent.OFANeutronAgent,
+                              'setup_integration_br',
+                              return_value=mock.Mock()),
+            mock.patch.object(self.mod_agent.OFANeutronAgent,
+                              'setup_ancillary_bridges',
+                              return_value=[]),
+            mock.patch.object(self.mod_agent.OVSBridge,
+                              'get_local_port_mac',
+                              return_value='00:00:00:00:00:01'),
+            mock.patch('neutron.agent.linux.utils.get_interface_mac',
+                       return_value='00:00:00:00:00:01'),
+            mock.patch('neutron.openstack.common.loopingcall.'
+                       'FixedIntervalLoopingCall',
+                       new=MockFixedIntervalLoopingCall)):
+            self.agent = self.mod_agent.OFANeutronAgent(self.ryuapp, **kwargs)
+            self.agent.tun_br = mock.Mock()
+            self.datapath = mock.Mock()
+            self.ofparser = mock.Mock()
+            self.datapath.ofparser = self.ofparser
+            self.ofparser.OFPMatch = mock.Mock()
+            self.ofparser.OFPMatch.return_value = mock.Mock()
+            self.ofparser.OFPFlowMod = mock.Mock()
+            self.ofparser.OFPFlowMod.return_value = mock.Mock()
+            self.agent.int_br.ofparser = self.ofparser
+
+        self.agent.sg_agent = mock.Mock()
+
+    def _mock_port_bound(self, ofport=None):
+        port = mock.Mock()
+        port.ofport = ofport
+        net_uuid = 'my-net-uuid'
+        with mock.patch.object(self.mod_agent.OVSBridge,
+                               'set_db_attribute',
+                               return_value=True):
+            with mock.patch.object(self.agent,
+                                   'ryu_send_msg') as ryu_send_msg_func:
+                self.agent.port_bound(port, net_uuid, 'local', None, None)
+        self.assertEqual(ryu_send_msg_func.called, ofport != -1)
+
+    def test_port_bound_deletes_flows_for_valid_ofport(self):
+        self._mock_port_bound(ofport=1)
+
+    def test_port_bound_ignores_flows_for_invalid_ofport(self):
+        self._mock_port_bound(ofport=-1)
+
+    def test_port_dead(self):
+        with mock.patch.object(self.mod_agent.OVSBridge,
+                               'set_db_attribute',
+                               return_value=True):
+            with mock.patch.object(self.agent,
+                                   'ryu_send_msg') as ryu_send_msg_func:
+                port = mock.Mock()
+                port.ofport = 2
+                self.agent.port_dead(port)
+        self.assertTrue(ryu_send_msg_func.called)
+
+    def mock_update_ports(self, vif_port_set=None, registered_ports=None):
+        with mock.patch.object(self.agent.int_br, 'get_vif_port_set',
+                               return_value=vif_port_set):
+            return self.agent.update_ports(registered_ports)
+
+    def test_update_ports_returns_none_for_unchanged_ports(self):
+        self.assertIsNone(self.mock_update_ports())
+
+    def test_update_ports_returns_port_changes(self):
+        vif_port_set = set([1, 3])
+        registered_ports = set([1, 2])
+        expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
+        actual = self.mock_update_ports(vif_port_set, registered_ports)
+        self.assertEqual(expected, actual)
+
+    def test_treat_devices_added_returns_true_for_missing_device(self):
+        with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
+                               side_effect=Exception()):
+            self.assertTrue(self.agent.treat_devices_added([{}]))
+
+    def _mock_treat_devices_added(self, details, port, func_name):
+        """Mock treat devices added.
+
+        :param details: the details to return for the device
+        :param port: the port that get_vif_port_by_id should return
+        :param func_name: the function that should be called
+        :returns: whether the named function was called
+        """
+        with contextlib.nested(
+            mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
+                              return_value=details),
+            mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
+                              return_value=port),
+            mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+            mock.patch.object(self.agent, func_name)
+        ) as (get_dev_fn, get_vif_func, upd_dev_up, func):
+            self.assertFalse(self.agent.treat_devices_added([{}]))
+        return func.called
+
+    def test_treat_devices_added_ignores_invalid_ofport(self):
+        port = mock.Mock()
+        port.ofport = -1
+        self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port,
+                                                        'port_dead'))
+
+    def test_treat_devices_added_marks_unknown_port_as_dead(self):
+        port = mock.Mock()
+        port.ofport = 1
+        self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port,
+                                                       'port_dead'))
+
+    def test_treat_devices_added_updates_known_port(self):
+        details = mock.MagicMock()
+        details.__contains__.side_effect = lambda x: True
+        self.assertTrue(self._mock_treat_devices_added(details,
+                                                       mock.Mock(),
+                                                       'treat_vif_port'))
+
+    def test_treat_devices_removed_returns_true_for_missing_device(self):
+        with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
+                               side_effect=Exception()):
+            self.assertTrue(self.agent.treat_devices_removed([{}]))
+
+    def _mock_treat_devices_removed(self, port_exists):
+        details = dict(exists=port_exists)
+        with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
+                               return_value=details):
+            with mock.patch.object(self.agent, 'port_unbound') as port_unbound:
+                self.assertFalse(self.agent.treat_devices_removed([{}]))
+        self.assertTrue(port_unbound.called)
+
+    def test_treat_devices_removed_unbinds_port(self):
+        self._mock_treat_devices_removed(True)
+
+    def test_treat_devices_removed_ignores_missing_port(self):
+        self._mock_treat_devices_removed(False)
+
+    def test_process_network_ports(self):
+        reply = {'current': set(['tap0']),
+                 'removed': set(['eth0']),
+                 'added': set(['eth1'])}
+        with mock.patch.object(self.agent, 'treat_devices_added',
+                               return_value=False) as device_added:
+            with mock.patch.object(self.agent, 'treat_devices_removed',
+                                   return_value=False) as device_removed:
+                self.assertFalse(self.agent.process_network_ports(reply))
+                device_added.assert_called_once_with(set(['eth1']))
+                device_removed.assert_called_once_with(set(['eth0']))
+
+    def test_report_state(self):
+        with mock.patch.object(self.agent.state_rpc,
+                               "report_state") as report_st:
+            self.agent.int_br_device_count = 5
+            self.agent._report_state()
+            report_st.assert_called_with(self.agent.context,
+                                         self.agent.agent_state)
+            self.assertNotIn("start_flag", self.agent.agent_state)
+            self.assertEqual(
+                self.agent.agent_state["configurations"]["devices"],
+                self.agent.int_br_device_count
+            )
+
+    def test_network_delete(self):
+        with contextlib.nested(
+            mock.patch.object(self.agent, "reclaim_local_vlan"),
+            mock.patch.object(self.agent.tun_br, "cleanup_tunnel_port")
+        ) as (recl_fn, clean_tun_fn):
+            self.agent.network_delete("unused_context",
+                                      network_id="123")
+            self.assertFalse(recl_fn.called)
+            self.agent.local_vlan_map["123"] = "LVM object"
+            self.agent.network_delete("unused_context",
+                                      network_id="123")
+            self.assertFalse(clean_tun_fn.called)
+            recl_fn.assert_called_with("123")
+
+    def test_port_update(self):
+        with contextlib.nested(
+            mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
+            mock.patch.object(self.agent, "treat_vif_port"),
+            mock.patch.object(self.agent.plugin_rpc, "update_device_up"),
+            mock.patch.object(self.agent.plugin_rpc, "update_device_down")
+        ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn):
+            port = {"id": "123",
+                    "network_id": "124",
+                    "admin_state_up": False}
+            getvif_fn.return_value = "vif_port_obj"
+            self.agent.port_update("unused_context",
+                                   port=port,
+                                   network_type="vlan",
+                                   segmentation_id="1",
+                                   physical_network="physnet")
+            treatvif_fn.assert_called_with("vif_port_obj", "123",
+                                           "124", "vlan", "physnet",
+                                           "1", False)
+            upddown_fn.assert_called_with(self.agent.context,
+                                          "123", self.agent.agent_id,
+                                          cfg.CONF.host)
+
+            port["admin_state_up"] = True
+            self.agent.port_update("unused_context",
+                                   port=port,
+                                   network_type="vlan",
+                                   segmentation_id="1",
+                                   physical_network="physnet")
+            updup_fn.assert_called_with(self.agent.context,
+                                        "123", self.agent.agent_id,
+                                        cfg.CONF.host)
+
+    def test_port_update_plugin_rpc_failed(self):
+        port = {'id': 1,
+                'network_id': 1,
+                'admin_state_up': True}
+        with contextlib.nested(
+            mock.patch.object(self.mod_agent.LOG, 'error'),
+            mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
+            mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+            mock.patch.object(self.agent, 'port_bound'),
+            mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
+            mock.patch.object(self.agent, 'port_dead')
+        ) as (log, _, device_up, _, device_down, _):
+            device_up.side_effect = rpc_common.Timeout
+            self.agent.port_update(mock.Mock(), port=port)
+            self.assertTrue(device_up.called)
+            self.assertEqual(log.call_count, 1)
+
+            log.reset_mock()
+            port['admin_state_up'] = False
+            device_down.side_effect = rpc_common.Timeout
+            self.agent.port_update(mock.Mock(), port=port)
+            self.assertTrue(device_down.called)
+            self.assertEqual(log.call_count, 1)
+
+    def test_setup_physical_bridges(self):
+        with contextlib.nested(
+            mock.patch.object(ip_lib, "device_exists"),
+            mock.patch.object(utils, "execute"),
+            mock.patch.object(self.mod_agent.OVSBridge, "add_port"),
+            mock.patch.object(self.mod_agent.OVSBridge, "delete_port"),
+            mock.patch.object(self.mod_agent.OVSBridge, "set_protocols"),
+            mock.patch.object(self.mod_agent.OVSBridge, "set_controller"),
+            mock.patch.object(self.mod_agent.OVSBridge, "get_datapath_id",
+                              return_value='0xa'),
+            mock.patch.object(self.agent.int_br, "add_port"),
+            mock.patch.object(self.agent.int_br, "delete_port"),
+            mock.patch.object(ip_lib.IPWrapper, "add_veth"),
+            mock.patch.object(ip_lib.IpLinkCommand, "delete"),
+            mock.patch.object(ip_lib.IpLinkCommand, "set_up"),
+            mock.patch.object(ip_lib.IpLinkCommand, "set_mtu"),
+            mock.patch.object(self.mod_agent.ryu_api, "get_datapath",
+                              return_value=self.datapath)
+        ) as (devex_fn, utilsexec_fn,
+              ovs_addport_fn, ovs_delport_fn, ovs_set_protocols_fn,
+              ovs_set_controller_fn, ovs_datapath_id_fn, br_addport_fn,
+              br_delport_fn, addveth_fn, linkdel_fn, linkset_fn, linkmtu_fn,
+              ryu_api_fn):
+            devex_fn.return_value = True
+            parent = mock.MagicMock()
+            parent.attach_mock(utilsexec_fn, 'utils_execute')
+            parent.attach_mock(linkdel_fn, 'link_delete')
+            parent.attach_mock(addveth_fn, 'add_veth')
+            addveth_fn.return_value = (ip_lib.IPDevice("int-br-eth1"),
+                                       ip_lib.IPDevice("phy-br-eth1"))
+            ovs_addport_fn.return_value = "25"
+            br_addport_fn.return_value = "11"
+            self.agent.setup_physical_bridges({"physnet1": "br-eth"})
+            expected_calls = [mock.call.link_delete(),
+                              mock.call.utils_execute(['/sbin/udevadm',
+                                                       'settle',
+                                                       '--timeout=10']),
+                              mock.call.add_veth('int-br-eth',
+                                                 'phy-br-eth')]
+            parent.assert_has_calls(expected_calls, any_order=False)
+            self.assertEqual(self.agent.int_ofports["physnet1"],
+                             "11")
+            self.assertEqual(self.agent.phys_ofports["physnet1"],
+                             "25")
+
+    def test_port_unbound(self):
+        with mock.patch.object(self.agent, "reclaim_local_vlan") as reclvl_fn:
+            self.agent.enable_tunneling = True
+            lvm = mock.Mock()
+            lvm.network_type = "gre"
+            lvm.vif_ports = {"vif1": mock.Mock()}
+            self.agent.local_vlan_map["netuid12345"] = lvm
+            self.agent.port_unbound("vif1", "netuid12345")
+            self.assertTrue(reclvl_fn.called)
+            reclvl_fn.called = False
+
+            lvm.vif_ports = {}
+            self.agent.port_unbound("vif1", "netuid12345")
+            self.assertEqual(reclvl_fn.call_count, 2)
+
+            lvm.vif_ports = {"vif1": mock.Mock()}
+            self.agent.port_unbound("vif3", "netuid12345")
+            self.assertEqual(reclvl_fn.call_count, 2)
+
+    def _check_ovs_vxlan_version(self, installed_usr_version,
+                                 installed_klm_version,
+                                 expecting_ok):
+        with mock.patch(
+                'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version'
+        ) as klm_cmd:
+            with mock.patch(
+                'neutron.agent.linux.ovs_lib.get_installed_ovs_usr_version'
+            ) as usr_cmd:
+                try:
+                    klm_cmd.return_value = installed_klm_version
+                    usr_cmd.return_value = installed_usr_version
+                    self.agent.tunnel_types = 'vxlan'
+                    self.agent._check_ovs_version()
+                    version_ok = True
+                except SystemExit as e:
+                    self.assertEqual(e.code, 1)
+                    version_ok = False
+            self.assertEqual(version_ok, expecting_ok)
+
+    def test_check_minimum_version(self):
+        min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+        self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver,
+                                      expecting_ok=True)
+
+    def test_check_future_version(self):
+        install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01)
+        self._check_ovs_vxlan_version(install_ver, install_ver,
+                                      expecting_ok=True)
+
+    def test_check_fail_version(self):
+        install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01)
+        self._check_ovs_vxlan_version(install_ver, install_ver,
+                                      expecting_ok=False)
+
+    def test_check_fail_no_version(self):
+        self._check_ovs_vxlan_version(None, None,
+                                      expecting_ok=False)
+
+    def test_check_fail_klm_version(self):
+        min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+        install_ver = str(float(min_vxlan_ver) - 0.01)
+        self._check_ovs_vxlan_version(min_vxlan_ver, install_ver,
+                                      expecting_ok=False)
+
+    def test_daemon_loop_uses_polling_manager(self):
+        with mock.patch(
+            'neutron.agent.linux.polling.get_polling_manager'
+        ) as mock_get_pm:
+            fake_pm = mock.Mock()
+            mock_get_pm.return_value = fake_pm
+            fake_pm.__enter__ = mock.Mock()
+            fake_pm.__exit__ = mock.Mock()
+            with mock.patch.object(
+                self.agent, 'ovsdb_monitor_loop'
+            ) as mock_loop:
+                self.agent.daemon_loop()
+        mock_get_pm.assert_called_once_with(True, 'fake_helper',
+                                            constants.DEFAULT_OVSDBMON_RESPAWN)
+        mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__())
+
+    def test_setup_tunnel_port_error_negative(self):
+        with contextlib.nested(
+            mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
+                              return_value='-1'),
+            mock.patch.object(self.mod_agent.LOG, 'error')
+        ) as (add_tunnel_port_fn, log_error_fn):
+            ofport = self.agent.setup_tunnel_port(
+                'gre-1', 'remote_ip', p_const.TYPE_GRE)
+            add_tunnel_port_fn.assert_called_once_with(
+                'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
+                self.agent.vxlan_udp_port)
+            log_error_fn.assert_called_once_with(
+                _("Failed to set-up %(type)s tunnel port to %(ip)s"),
+                {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
+            self.assertEqual(ofport, 0)
+
+    def test_setup_tunnel_port_error_not_int(self):
+        with contextlib.nested(
+            mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
+                              return_value=None),
+            mock.patch.object(self.mod_agent.LOG, 'exception'),
+            mock.patch.object(self.mod_agent.LOG, 'error')
+        ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
+            ofport = self.agent.setup_tunnel_port(
+                'gre-1', 'remote_ip', p_const.TYPE_GRE)
+            add_tunnel_port_fn.assert_called_once_with(
+                'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
+                self.agent.vxlan_udp_port)
+            log_exc_fn.assert_called_once_with(
+                _("ofport should have a value that can be "
+                  "interpreted as an integer"))
+            log_error_fn.assert_called_once_with(
+                _("Failed to set-up %(type)s tunnel port to %(ip)s"),
+                {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
+            self.assertEqual(ofport, 0)
+
+
+class AncillaryBridgesTest(OFAAgentTestCase):
+
+    def setUp(self):
+        super(AncillaryBridgesTest, self).setUp()
+        notifier_p = mock.patch(NOTIFIER)
+        notifier_cls = notifier_p.start()
+        self.notifier = mock.Mock()
+        notifier_cls.return_value = self.notifier
+        # Avoid rpc initialization for unit tests
+        cfg.CONF.set_override('rpc_backend',
+                              'neutron.openstack.common.rpc.impl_fake')
+        cfg.CONF.set_override('report_interval', 0, 'AGENT')
+        self.kwargs = self.mod_agent.create_agent_config_map(cfg.CONF)
+
+    def _test_ancillary_bridges(self, bridges, ancillary):
+        device_ids = ancillary[:]
+
+        def pullup_side_effect(self, *args):
+            result = device_ids.pop(0)
+            return result
+
+        with contextlib.nested(
+            mock.patch.object(self.mod_agent.OFANeutronAgent,
+                              'setup_integration_br',
+                              return_value=mock.Mock()),
+            mock.patch('neutron.agent.linux.utils.get_interface_mac',
+                       return_value='00:00:00:00:00:01'),
+            mock.patch.object(self.mod_agent.OVSBridge,
+                              'get_local_port_mac',
+                              return_value='00:00:00:00:00:01'),
+            mock.patch('neutron.agent.linux.ovs_lib.get_bridges',
+                       return_value=bridges),
+            mock.patch(
+                'neutron.agent.linux.ovs_lib.get_bridge_external_bridge_id',
+                side_effect=pullup_side_effect)):
+            self.agent = self.mod_agent.OFANeutronAgent(
+                self.ryuapp, **self.kwargs)
+            self.assertEqual(len(ancillary), len(self.agent.ancillary_brs))
+            if ancillary:
+                bridges = [br.br_name for br in self.agent.ancillary_brs]
+                for br in ancillary:
+                    self.assertIn(br, bridges)
+
+    def test_ancillary_bridges_single(self):
+        bridges = ['br-int', 'br-ex']
+        self._test_ancillary_bridges(bridges, ['br-ex'])
+
+    def test_ancillary_bridges_none(self):
+        bridges = ['br-int']
+        self._test_ancillary_bridges(bridges, [])
+
+    def test_ancillary_bridges_multiple(self):
+        bridges = ['br-int', 'br-ex1', 'br-ex2']
+        self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])
index a556cf74f6b4a8427252643f5304278b96376c42..c3334279aa1de0d2a74daa8245ddb7e823503ff0 100644 (file)
@@ -20,6 +20,7 @@ from neutron.agent.linux import ovs_lib
 from neutron.agent.linux import utils
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import uuidutils
+from neutron.plugins.openvswitch.common import constants
 from neutron.tests import base
 from neutron.tests import tools
 
@@ -131,6 +132,37 @@ class OVS_Lib_Test(base.BaseTestCase):
         # test __str__
         str(port)
 
+    def test_set_controller(self):
+        controller_names = ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555']
+        self.br.set_controller(controller_names)
+        self.execute.assert_called_once_with(
+            ['ovs-vsctl', self.TO, '--', 'set-controller', self.BR_NAME,
+             'tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'],
+            root_helper=self.root_helper)
+
+    def test_del_controller(self):
+        self.br.del_controller()
+        self.execute.assert_called_once_with(
+            ['ovs-vsctl', self.TO, '--', 'del-controller', self.BR_NAME],
+            root_helper=self.root_helper)
+
+    def test_get_controller(self):
+        self.execute.return_value = 'tcp:127.0.0.1:6633\ntcp:172.17.16.10:5555'
+        names = self.br.get_controller()
+        self.assertEqual(names,
+                         ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'])
+        self.execute.assert_called_once_with(
+            ['ovs-vsctl', self.TO, '--', 'get-controller', self.BR_NAME],
+            root_helper=self.root_helper)
+
+    def test_set_protocols(self):
+        protocols = 'OpenFlow13'
+        self.br.set_protocols(protocols)
+        self.execute.assert_called_once_with(
+            ['ovs-vsctl', self.TO, '--', 'set', 'bridge', self.BR_NAME,
+             "protocols=%s" % protocols],
+            root_helper=self.root_helper)
+
     def test_create(self):
         self.br.add_bridge(self.BR_NAME)
 
@@ -666,3 +698,47 @@ class OVS_Lib_Test(base.BaseTestCase):
         data = [[["map", external_ids], "tap99", 1]]
         self.assertIsNone(self._test_get_vif_port_by_id('tap99id', data,
                                                         "br-ext"))
+
+    def _check_ovs_vxlan_version(self, installed_usr_version,
+                                 installed_klm_version,
+                                 expecting_ok):
+        with mock.patch(
+                'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version'
+        ) as klm_cmd:
+            with mock.patch(
+                'neutron.agent.linux.ovs_lib.get_installed_ovs_usr_version'
+            ) as usr_cmd:
+                try:
+                    klm_cmd.return_value = installed_klm_version
+                    usr_cmd.return_value = installed_usr_version
+                    ovs_lib.check_ovs_vxlan_version(root_helper='sudo')
+                    version_ok = True
+                except SystemError:
+                    version_ok = False
+            self.assertEqual(version_ok, expecting_ok)
+
+    def test_check_minimum_version(self):
+        min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+        self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver,
+                                      expecting_ok=True)
+
+    def test_check_future_version(self):
+        install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01)
+        self._check_ovs_vxlan_version(install_ver, install_ver,
+                                      expecting_ok=True)
+
+    def test_check_fail_version(self):
+        install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01)
+        self._check_ovs_vxlan_version(install_ver, install_ver,
+                                      expecting_ok=False)
+
+    def test_check_fail_no_version(self):
+        self._check_ovs_vxlan_version(None, None,
+                                      expecting_ok=False)
+
+    def test_check_fail_klm_version(self):
+        min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+        install_ver = str(float(min_vxlan_ver) - 0.01)
+        self._check_ovs_vxlan_version(min_vxlan_ver,
+                                      install_ver,
+                                      expecting_ok=False)
index 959101cab143c13a7cb42b526c760c04c27580a8..0216308467a625748f23f55b5b33e270e57378f8 100644 (file)
@@ -468,7 +468,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             self.assertEqual(reclvl_fn.call_count, 2)
 
     def _check_ovs_vxlan_version(self, installed_usr_version,
-                                 installed_klm_version, min_vers,
+                                 installed_klm_version,
                                  expecting_ok):
         with mock.patch(
                 'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version'
@@ -480,8 +480,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                     klm_cmd.return_value = installed_klm_version
                     usr_cmd.return_value = installed_usr_version
                     self.agent.tunnel_types = 'vxlan'
-                    ovs_neutron_agent.check_ovs_version(min_vers,
-                                                        root_helper='sudo')
+                    self.agent._check_ovs_version()
                     version_ok = True
                 except SystemExit as e:
                     self.assertEqual(e.code, 1)
@@ -489,28 +488,28 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             self.assertEqual(version_ok, expecting_ok)
 
     def test_check_minimum_version(self):
-        self._check_ovs_vxlan_version('1.10', '1.10',
-                                      constants.MINIMUM_OVS_VXLAN_VERSION,
+        min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+        self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver,
                                       expecting_ok=True)
 
     def test_check_future_version(self):
-        self._check_ovs_vxlan_version('1.11', '1.11',
-                                      constants.MINIMUM_OVS_VXLAN_VERSION,
+        install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01)
+        self._check_ovs_vxlan_version(install_ver, install_ver,
                                       expecting_ok=True)
 
     def test_check_fail_version(self):
-        self._check_ovs_vxlan_version('1.9', '1.9',
-                                      constants.MINIMUM_OVS_VXLAN_VERSION,
+        install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01)
+        self._check_ovs_vxlan_version(install_ver, install_ver,
                                       expecting_ok=False)
 
     def test_check_fail_no_version(self):
         self._check_ovs_vxlan_version(None, None,
-                                      constants.MINIMUM_OVS_VXLAN_VERSION,
                                       expecting_ok=False)
 
     def test_check_fail_klm_version(self):
-        self._check_ovs_vxlan_version('1.10', '1.9',
-                                      constants.MINIMUM_OVS_VXLAN_VERSION,
+        min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+        install_ver = str(float(min_vxlan_ver) - 0.01)
+        self._check_ovs_vxlan_version(min_vxlan_ver, install_ver,
                                       expecting_ok=False)
 
     def _prepare_l2_pop_ofports(self):
index 56b98aeaff8646eead5e80eb5801cb183525a015..75a12e81c7d56b9762c779a60cf05532e24d720e 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -60,6 +60,7 @@ data_files =
         etc/neutron/plugins/ml2/ml2_conf_arista.ini
         etc/neutron/plugins/ml2/ml2_conf_cisco.ini
         etc/neutron/plugins/bigswitch/restproxy.ini
+        etc/neutron/plugins/ml2/ml2_conf_ofa.ini
     etc/neutron/plugins/mlnx = etc/neutron/plugins/mlnx/mlnx_conf.ini
     etc/neutron/plugins/nec = etc/neutron/plugins/nec/nec.ini
     etc/neutron/plugins/nicira = etc/neutron/plugins/nicira/nvp.ini
@@ -125,6 +126,7 @@ console_scripts =
     quantum-rootwrap = oslo.rootwrap.cmd:main
     quantum-usage-audit = neutron.cmd.usage_audit:main
     neutron-metering-agent = neutron.services.metering.agents.metering_agent:main
+    neutron-ofagent-agent = ryu.cmd.ofa_neutron_agent:main
 neutron.core_plugins =
     bigswitch = neutron.plugins.bigswitch.plugin:NeutronRestProxyV2
     brocade = neutron.plugins.brocade.NeutronPlugin:BrocadePluginV2
@@ -167,6 +169,7 @@ neutron.ml2.mechanism_drivers =
     cisco_nexus = neutron.plugins.ml2.drivers.cisco.nexus.mech_cisco_nexus:CiscoNexusMechanismDriver
     l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver
     bigswitch = neutron.plugins.ml2.drivers.mech_bigswitch.driver:BigSwitchMechanismDriver
+    ofagent = neutron.plugins.ml2.drivers.mech_ofagent:OfagentMechanismDriver
 neutron.openstack.common.cache.backends =
     memory = neutron.openstack.common.cache._backends.memory:MemoryBackend