--- /dev/null
+# Defines configuration options specific to the OpenFlow Agent Mechanism Driver
+
+[ovs]
+# Please refer to configuration options to the OpenvSwitch
+
+[agent]
+# (IntOpt) Number of seconds to retry acquiring an Open vSwitch datapath.
+# This is an optional parameter, default value is 60 seconds.
+#
+# get_datapath_retry_times =
+# Example: get_datapath_retry_times = 30
+
+# Please refer to configuration options to the OpenvSwitch else the above.
# License for the specific language governing permissions and limitations
# under the License.
+import distutils.version as dist_version
import re
from oslo.config import cfg
self.defer_apply_flows = False
self.deferred_flows = {'add': '', 'mod': '', 'del': ''}
+ def set_controller(self, controller_names):
+ vsctl_command = ['--', 'set-controller', self.br_name]
+ vsctl_command.extend(controller_names)
+ self.run_vsctl(vsctl_command, check_error=True)
+
+ def del_controller(self):
+ self.run_vsctl(['--', 'del-controller', self.br_name],
+ check_error=True)
+
+ def get_controller(self):
+ res = self.run_vsctl(['--', 'get-controller', self.br_name],
+ check_error=True)
+ if res:
+ return res.strip().split('\n')
+ return res
+
+ def set_protocols(self, protocols):
+ self.run_vsctl(['--', 'set', 'bridge', self.br_name,
+ "protocols=%s" % protocols],
+ check_error=True)
+
def create(self):
self.add_bridge(self.br_name)
ofport = data[ofport_idx]
# ofport must be integer otherwise return None
if not isinstance(ofport, int) or ofport == -1:
- LOG.warn(_("ofport: %(ofport)s for VIF: %(vif)s is not a"
+ LOG.warn(_("ofport: %(ofport)s for VIF: %(vif)s is not a "
"positive integer"), {'ofport': ofport,
'vif': port_id})
return
except Exception:
LOG.exception(_("Bridge %s not found."), bridge)
return None
+
+
+def _compare_installed_and_required_version(
+ installed_version, required_version, check_type, version_type):
+ if installed_version:
+ if dist_version.StrictVersion(
+ installed_version) < dist_version.StrictVersion(
+ required_version):
+ msg = (_('Failed %(ctype)s version check for Open '
+ 'vSwitch with %(vtype)s support. To use '
+ '%(vtype)s tunnels with OVS, please ensure '
+ 'the OVS version is %(required)s or newer!') %
+ {'ctype': check_type, 'vtype': version_type,
+ 'required': required_version})
+ raise SystemError(msg)
+ else:
+ msg = (_('Unable to determine %(ctype)s version for Open '
+ 'vSwitch with %(vtype)s support. To use '
+ '%(vtype)s tunnels with OVS, please ensure '
+ 'that the version is %(required)s or newer!') %
+ {'ctype': check_type, 'vtype': version_type,
+ 'required': required_version})
+ raise SystemError(msg)
+
+
+def check_ovs_vxlan_version(root_helper):
+ min_required_version = constants.MINIMUM_OVS_VXLAN_VERSION
+ installed_klm_version = get_installed_ovs_klm_version()
+ installed_usr_version = get_installed_ovs_usr_version(root_helper)
+ LOG.debug(_("Checking OVS version for VXLAN support "
+ "installed klm version is %s "), installed_klm_version)
+ LOG.debug(_("Checking OVS version for VXLAN support "
+ "installed usr version is %s"), installed_usr_version)
+ # First check the userspace version
+ _compare_installed_and_required_version(installed_usr_version,
+ min_required_version,
+ 'userspace', 'VXLAN')
+ # Now check the kernel version
+ _compare_installed_and_required_version(installed_klm_version,
+ min_required_version,
+ 'kernel', 'VXLAN')
AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
AGENT_TYPE_HYPERV = 'HyperV agent'
AGENT_TYPE_NEC = 'NEC plugin agent'
+AGENT_TYPE_OFA = 'OFA driver agent'
AGENT_TYPE_L3 = 'L3 agent'
AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
AGENT_TYPE_MLNX = 'Mellanox plugin agent'
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Based on openvswitch mechanism driver.
+#
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+from neutron.common import constants
+from neutron.extensions import portbindings
+from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
+from neutron.plugins.ml2 import driver_api as api
+from neutron.plugins.ml2.drivers import mech_agent
+
+LOG = log.getLogger(__name__)
+
+
+class OfagentMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase):
+ """Attach to networks using ofagent L2 agent.
+
+ The OfagentMechanismDriver integrates the ml2 plugin with the
+ ofagent L2 agent. Port binding with this driver requires the
+ ofagent agent to be running on the port's host, and that agent
+ to have connectivity to at least one segment of the port's
+ network.
+ """
+
+ def __init__(self):
+ super(OfagentMechanismDriver, self).__init__(
+ constants.AGENT_TYPE_OFA,
+ portbindings.VIF_TYPE_OVS,
+ {portbindings.CAP_PORT_FILTER: True})
+
+ def check_segment_for_agent(self, segment, agent):
+ mappings = agent['configurations'].get('bridge_mappings', {})
+ tunnel_types = agent['configurations'].get('tunnel_types', [])
+ LOG.debug(_("Checking segment: %(segment)s "
+ "for mappings: %(mappings)s "
+ "with tunnel_types: %(tunnel_types)s"),
+ {'segment': segment, 'mappings': mappings,
+ 'tunnel_types': tunnel_types})
+ network_type = segment[api.NETWORK_TYPE]
+ return (
+ network_type == p_const.TYPE_LOCAL or
+ network_type in tunnel_types or
+ (network_type in [p_const.TYPE_FLAT, p_const.TYPE_VLAN] and
+ segment[api.PHYSICAL_NETWORK] in mappings)
+ )
--- /dev/null
+This directory includes agent for OpenFlow Agent mechanism driver.
+
+# -- Installation
+
+For how to install/set up ML2 mechanism driver for OpenFlow Agent, please refer to
+https://github.com/osrg/ryu/wiki/OpenStack
+
+# -- Ryu General
+
+For general Ryu stuff, please refer to
+http://www.osrg.net/ryu/
+
+Ryu is available at github
+git://github.com/osrg/ryu.git
+https://github.com/osrg/ryu
+
+The mailing is at
+ryu-devel@lists.sourceforge.net
+https://lists.sourceforge.net/lists/listinfo/ryu-devel
+
+Enjoy!
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Based on openvswitch agent.
+#
+# Copyright 2011 Nicira Networks, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+import time
+
+from oslo.config import cfg
+from ryu.app.ofctl import api as ryu_api
+from ryu.base import app_manager
+from ryu.lib import hub
+from ryu.ofproto import ofproto_v1_3 as ryu_ofp13
+
+from neutron.agent.linux import ip_lib
+from neutron.agent.linux import ovs_lib
+from neutron.agent.linux import polling
+from neutron.agent.linux import utils
+from neutron.agent import rpc as agent_rpc
+from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.common import constants as n_const
+from neutron.common import topics
+from neutron.common import utils as n_utils
+from neutron import context
+from neutron.extensions import securitygroup as ext_sg
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
+from neutron.openstack.common.rpc import common as rpc_common
+from neutron.openstack.common.rpc import dispatcher
+from neutron.plugins.common import constants as p_const
+from neutron.plugins.ofagent.common import config # noqa
+from neutron.plugins.openvswitch.common import constants
+
+
+LOG = logging.getLogger(__name__)
+
+# A placeholder for dead vlans.
+DEAD_VLAN_TAG = str(n_const.MAX_VLAN_TAG + 1)
+
+
+# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
+# attributes set).
+class LocalVLANMapping:
+ def __init__(self, vlan, network_type, physical_network, segmentation_id,
+ vif_ports=None):
+ if vif_ports is None:
+ vif_ports = {}
+ self.vlan = vlan
+ self.network_type = network_type
+ self.physical_network = physical_network
+ self.segmentation_id = segmentation_id
+ self.vif_ports = vif_ports
+ # set of tunnel ports on which packets should be flooded
+ self.tun_ofports = set()
+
+ def __str__(self):
+ return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
+ (self.vlan, self.network_type, self.physical_network,
+ self.segmentation_id))
+
+
+class Port(object):
+ """Represents a neutron port.
+
+ Class stores port data in a ORM-free way, so attributres are
+ still available even if a row has been deleted.
+ """
+
+ def __init__(self, p):
+ self.id = p.id
+ self.network_id = p.network_id
+ self.device_id = p.device_id
+ self.admin_state_up = p.admin_state_up
+ self.status = p.status
+
+ def __eq__(self, other):
+ """Compare only fields that will cause us to re-wire."""
+ try:
+ return (other and self.id == other.id
+ and self.admin_state_up == other.admin_state_up)
+ except Exception:
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ return hash(self.id)
+
+
+class OVSBridge(ovs_lib.OVSBridge):
+ def __init__(self, br_name, root_helper, ryuapp):
+ super(OVSBridge, self).__init__(br_name, root_helper)
+ self.datapath_id = None
+ self.datapath = None
+ self.ofparser = None
+ self.ryuapp = ryuapp
+
+ def find_datapath_id(self):
+ self.datapath_id = self.get_datapath_id()
+
+ def get_datapath(self, retry_max=cfg.CONF.AGENT.get_datapath_retry_times):
+ retry = 0
+ while self.datapath is None:
+ self.datapath = ryu_api.get_datapath(self.ryuapp,
+ int(self.datapath_id, 16))
+ retry += 1
+ if retry >= retry_max:
+ LOG.error(_('Agent terminated!: Failed to get a datapath.'))
+ raise SystemExit(1)
+ time.sleep(1)
+ self.ofparser = self.datapath.ofproto_parser
+
+ def setup_ofp(self, controller_names=None,
+ protocols='OpenFlow13',
+ retry_max=cfg.CONF.AGENT.get_datapath_retry_times):
+ if not controller_names:
+ host = cfg.CONF.ofp_listen_host
+ if not host:
+ # 127.0.0.1 is a default for agent style of controller
+ host = '127.0.0.1'
+ controller_names = ["tcp:%s:%d" % (host,
+ cfg.CONF.ofp_tcp_listen_port)]
+ try:
+ self.set_protocols(protocols)
+ self.set_controller(controller_names)
+ except RuntimeError:
+ LOG.exception(_("Agent terminated"))
+ raise SystemExit(1)
+ self.find_datapath_id()
+ self.get_datapath(retry_max)
+
+
+class OFAPluginApi(agent_rpc.PluginApi,
+ sg_rpc.SecurityGroupServerRpcApiMixin):
+ pass
+
+
+class OFASecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
+ def __init__(self, context, plugin_rpc, root_helper):
+ self.context = context
+ self.plugin_rpc = plugin_rpc
+ self.root_helper = root_helper
+ self.init_firewall()
+
+
+class OFANeutronAgentRyuApp(app_manager.RyuApp):
+ OFP_VERSIONS = [ryu_ofp13.OFP_VERSION]
+
+ def start(self):
+
+ super(OFANeutronAgentRyuApp, self).start()
+ return hub.spawn(self._agent_main, self)
+
+ def _agent_main(self, ryuapp):
+ cfg.CONF.register_opts(ip_lib.OPTS)
+
+ try:
+ agent_config = create_agent_config_map(cfg.CONF)
+ except ValueError:
+ LOG.exception(_("Agent failed to create agent config map"))
+ raise SystemExit(1)
+
+ is_xen_compute_host = ('rootwrap-xen-dom0' in
+ agent_config['root_helper'])
+ if is_xen_compute_host:
+ # Force ip_lib to always use the root helper to ensure that ip
+ # commands target xen dom0 rather than domU.
+ cfg.CONF.set_default('ip_lib_force_root', True)
+
+ agent = OFANeutronAgent(ryuapp, **agent_config)
+
+ # Start everything.
+ LOG.info(_("Agent initialized successfully, now running... "))
+ agent.daemon_loop()
+
+
+class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
+ """A agent for OpenFlow Agent ML2 mechanism driver.
+
+ OFANeutronAgent is a OpenFlow Agent agent for a ML2 plugin.
+ This is as a ryu application thread.
+ - An agent acts as an OpenFlow controller on each compute nodes.
+ - OpenFlow 1.3 (vendor agnostic unlike OVS extensions).
+ """
+
+ # history
+ # 1.0 Initial version
+ # 1.1 Support Security Group RPC
+ RPC_API_VERSION = '1.1'
+
+ def __init__(self, ryuapp, integ_br, tun_br, local_ip,
+ bridge_mappings, root_helper,
+ polling_interval, tunnel_types=None,
+ veth_mtu=None, l2_population=False,
+ minimize_polling=False,
+ ovsdb_monitor_respawn_interval=(
+ constants.DEFAULT_OVSDBMON_RESPAWN)):
+ """Constructor.
+
+ :param ryuapp: object of the ryu app.
+ :param integ_br: name of the integration bridge.
+ :param tun_br: name of the tunnel bridge.
+ :param local_ip: local IP address of this hypervisor.
+ :param bridge_mappings: mappings from physical network name to bridge.
+ :param root_helper: utility to use when running shell cmds.
+ :param polling_interval: interval (secs) to poll DB.
+ :param tunnel_types: A list of tunnel types to enable support for in
+ the agent. If set, will automatically set enable_tunneling to
+ True.
+ :param veth_mtu: MTU size for veth interfaces.
+ :param minimize_polling: Optional, whether to minimize polling by
+ monitoring ovsdb for interface changes.
+ :param ovsdb_monitor_respawn_interval: Optional, when using polling
+ minimization, the number of seconds to wait before respawning
+ the ovsdb monitor.
+ """
+ self.ryuapp = ryuapp
+ self.veth_mtu = veth_mtu
+ self.root_helper = root_helper
+ self.available_local_vlans = set(xrange(n_const.MIN_VLAN_TAG,
+ n_const.MAX_VLAN_TAG))
+ self.tunnel_types = tunnel_types or []
+ self.l2_pop = l2_population
+ self.agent_state = {
+ 'binary': 'neutron-ofa-agent',
+ 'host': cfg.CONF.host,
+ 'topic': n_const.L2_AGENT_TOPIC,
+ 'configurations': {'bridge_mappings': bridge_mappings,
+ 'tunnel_types': self.tunnel_types,
+ 'tunneling_ip': local_ip,
+ 'l2_population': self.l2_pop},
+ 'agent_type': n_const.AGENT_TYPE_OFA,
+ 'start_flag': True}
+
+ # Keep track of int_br's device count for use by _report_state()
+ self.int_br_device_count = 0
+
+ self.int_br = OVSBridge(integ_br, self.root_helper, self.ryuapp)
+ self.setup_rpc()
+ self.setup_integration_br()
+ self.setup_physical_bridges(bridge_mappings)
+ self.local_vlan_map = {}
+ self.tun_br_ofports = {p_const.TYPE_GRE: {},
+ p_const.TYPE_VXLAN: {}}
+
+ self.polling_interval = polling_interval
+ self.minimize_polling = minimize_polling
+ self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
+
+ self.enable_tunneling = bool(self.tunnel_types)
+ self.local_ip = local_ip
+ self.tunnel_count = 0
+ self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
+ self._check_ovs_version()
+ if self.enable_tunneling:
+ self.setup_tunnel_br(tun_br)
+ # Collect additional bridges to monitor
+ self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)
+
+ # Security group agent support
+ self.sg_agent = OFASecurityGroupAgent(self.context,
+ self.plugin_rpc,
+ self.root_helper)
+ # Initialize iteration counter
+ self.iter_num = 0
+
+ def _check_ovs_version(self):
+ if p_const.TYPE_VXLAN in self.tunnel_types:
+ try:
+ ovs_lib.check_ovs_vxlan_version(self.root_helper)
+ except SystemError:
+ LOG.exception(_("Agent terminated"))
+ raise SystemExit(1)
+
+ def _report_state(self):
+ # How many devices are likely used by a VM
+ self.agent_state.get('configurations')['devices'] = (
+ self.int_br_device_count)
+ try:
+ self.state_rpc.report_state(self.context,
+ self.agent_state)
+ self.agent_state.pop('start_flag', None)
+ except Exception:
+ LOG.exception(_("Failed reporting state!"))
+
+ def ryu_send_msg(self, msg):
+ result = ryu_api.send_msg(self.ryuapp, msg)
+ LOG.info(_("ryu send_msg() result: %s"), result)
+
+ def setup_rpc(self):
+ mac = self.int_br.get_local_port_mac()
+ self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
+ self.topic = topics.AGENT
+ self.plugin_rpc = OFAPluginApi(topics.PLUGIN)
+ self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
+
+ # RPC network init
+ self.context = context.get_admin_context_without_session()
+ # Handle updates from service
+ self.dispatcher = self.create_rpc_dispatcher()
+ # Define the listening consumers for the agent
+ consumers = [[topics.PORT, topics.UPDATE],
+ [topics.NETWORK, topics.DELETE],
+ [constants.TUNNEL, topics.UPDATE],
+ [topics.SECURITY_GROUP, topics.UPDATE]]
+ self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.topic,
+ consumers)
+ report_interval = cfg.CONF.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.FixedIntervalLoopingCall(
+ self._report_state)
+ heartbeat.start(interval=report_interval)
+
+ def get_net_uuid(self, vif_id):
+ for network_id, vlan_mapping in self.local_vlan_map.iteritems():
+ if vif_id in vlan_mapping.vif_ports:
+ return network_id
+
+ def network_delete(self, context, **kwargs):
+ network_id = kwargs.get('network_id')
+ LOG.debug(_("network_delete received network %s"), network_id)
+ # The network may not be defined on this agent
+ lvm = self.local_vlan_map.get(network_id)
+ if lvm:
+ self.reclaim_local_vlan(network_id)
+ else:
+ LOG.debug(_("Network %s not used on agent."), network_id)
+
+ def port_update(self, context, **kwargs):
+ port = kwargs.get('port')
+ LOG.debug(_("port_update received port %s"), port['id'])
+ # Validate that port is on OVS
+ vif_port = self.int_br.get_vif_port_by_id(port['id'])
+ if not vif_port:
+ return
+
+ if ext_sg.SECURITYGROUPS in port:
+ self.sg_agent.refresh_firewall()
+ network_type = kwargs.get('network_type')
+ segmentation_id = kwargs.get('segmentation_id')
+ physical_network = kwargs.get('physical_network')
+ self.treat_vif_port(vif_port, port['id'], port['network_id'],
+ network_type, physical_network,
+ segmentation_id, port['admin_state_up'])
+ try:
+ if port['admin_state_up']:
+ # update plugin about port status
+ self.plugin_rpc.update_device_up(self.context, port['id'],
+ self.agent_id,
+ cfg.CONF.host)
+ else:
+ # update plugin about port status
+ self.plugin_rpc.update_device_down(self.context, port['id'],
+ self.agent_id,
+ cfg.CONF.host)
+ except rpc_common.Timeout:
+ LOG.error(_("RPC timeout while updating port %s"), port['id'])
+
+ def tunnel_update(self, context, **kwargs):
+ LOG.debug(_("tunnel_update received"))
+ if not self.enable_tunneling:
+ return
+ tunnel_ip = kwargs.get('tunnel_ip')
+ tunnel_id = kwargs.get('tunnel_id', tunnel_ip)
+ tunnel_type = kwargs.get('tunnel_type')
+ if not tunnel_type:
+ LOG.error(_("No tunnel_type specified, cannot create tunnels"))
+ return
+ if tunnel_type not in self.tunnel_types:
+ LOG.error(_("tunnel_type %s not supported by agent"), tunnel_type)
+ return
+ if tunnel_ip == self.local_ip:
+ return
+ tun_name = '%s-%s' % (tunnel_type, tunnel_id)
+ self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
+
+ def create_rpc_dispatcher(self):
+ """Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ """
+ return dispatcher.RpcDispatcher([self])
+
+ def _provision_local_vlan_outbound_for_tunnel(self, lvid,
+ segmentation_id, ofports):
+ br = self.tun_br
+ match = br.ofparser.OFPMatch(
+ vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)
+ actions = [br.ofparser.OFPActionPopVlan(),
+ br.ofparser.OFPActionSetField(
+ tunnel_id=int(segmentation_id))]
+ for ofport in ofports:
+ actions.append(br.ofparser.OFPActionOutput(ofport, 0))
+ instructions = [br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]
+ msg = br.ofparser.OFPFlowMod(
+ br.datapath,
+ table_id=constants.FLOOD_TO_TUN,
+ priority=1,
+ match=match, instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _provision_local_vlan_inbound_for_tunnel(self, lvid, network_type,
+ segmentation_id):
+ br = self.tun_br
+ match = br.ofparser.OFPMatch(
+ tunnel_id=int(segmentation_id))
+ actions = [br.ofparser.OFPActionSetField(
+ vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)]
+ instructions = [
+ br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS, actions),
+ br.ofparser.OFPInstructionGotoTable(
+ table_id=constants.LEARN_FROM_TUN)]
+ msg = br.ofparser.OFPFlowMod(
+ br.datapath,
+ table_id=constants.TUN_TABLE[network_type],
+ priority=1,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _local_vlan_for_tunnel(self, lvid, network_type, segmentation_id):
+ ofports = [int(ofport) for ofport in
+ self.tun_br_ofports[network_type].values()]
+ if ofports:
+ self._provision_local_vlan_outbound_for_tunnel(
+ lvid, segmentation_id, ofports)
+ self._provision_local_vlan_inbound_for_tunnel(lvid, network_type,
+ segmentation_id)
+
+ def _provision_local_vlan_outbound(self, br, lvid, actions,
+ physical_network):
+ match = br.ofparser.OFPMatch(
+ in_port=int(self.phys_ofports[physical_network]),
+ vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)
+ instructions = [br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ priority=4,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _provision_local_vlan_inbound(self, lvid, vlan_vid, physical_network):
+ match = self.int_br.ofparser.OFPMatch(
+ in_port=int(self.int_ofports[physical_network]),
+ vlan_vid=vlan_vid)
+ actions = [self.int_br.ofparser.OFPActionSetField(
+ vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT),
+ self.int_br.ofparser.OFPActionOutput(
+ ryu_ofp13.OFPP_NORMAL, 0)]
+ instructions = [self.int_br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)]
+ msg = self.int_br.ofparser.OFPFlowMod(
+ self.int_br.datapath,
+ priority=3,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _local_vlan_for_flat(self, lvid, physical_network):
+ br = self.phys_brs[physical_network]
+ actions = [br.ofparser.OFPActionPopVlan(),
+ br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]
+ self._provision_local_vlan_outbound(
+ br, lvid, actions, physical_network)
+ self._provision_local_vlan_inbound(lvid, 0xffff, physical_network)
+
+ def _local_vlan_for_vlan(self, lvid, physical_network, segmentation_id):
+ br = self.phys_brs[physical_network]
+ actions = [br.ofparser.OFPActionSetField(
+ vlan_vid=int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT),
+ br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]
+ self._provision_local_vlan_outbound(
+ br, lvid, actions, physical_network)
+ self._provision_local_vlan_inbound(
+ lvid, int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT,
+ physical_network)
+
+ def provision_local_vlan(self, net_uuid, network_type, physical_network,
+ segmentation_id):
+ """Provisions a local VLAN.
+
+ :param net_uuid: the uuid of the network associated with this vlan.
+ :param network_type: the network type ('gre', 'vxlan', 'vlan', 'flat',
+ 'local')
+ :param physical_network: the physical network for 'vlan' or 'flat'
+ :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
+ """
+
+ if not self.available_local_vlans:
+ LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
+ return
+ lvid = self.available_local_vlans.pop()
+ LOG.info(_("Assigning %(vlan_id)s as local vlan for "
+ "net-id=%(net_uuid)s"),
+ {'vlan_id': lvid, 'net_uuid': net_uuid})
+ self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,
+ physical_network,
+ segmentation_id)
+
+ if network_type in constants.TUNNEL_NETWORK_TYPES:
+ if self.enable_tunneling:
+ self._local_vlan_for_tunnel(lvid, network_type,
+ segmentation_id)
+ else:
+ LOG.error(_("Cannot provision %(network_type)s network for "
+ "net-id=%(net_uuid)s - tunneling disabled"),
+ {'network_type': network_type,
+ 'net_uuid': net_uuid})
+ elif network_type == p_const.TYPE_FLAT:
+ if physical_network in self.phys_brs:
+ self._local_vlan_for_flat(lvid, physical_network)
+ else:
+ LOG.error(_("Cannot provision flat network for "
+ "net-id=%(net_uuid)s - no bridge for "
+ "physical_network %(physical_network)s"),
+ {'net_uuid': net_uuid,
+ 'physical_network': physical_network})
+ elif network_type == p_const.TYPE_VLAN:
+ if physical_network in self.phys_brs:
+ self._local_vlan_for_vlan(lvid, physical_network,
+ segmentation_id)
+ else:
+ LOG.error(_("Cannot provision VLAN network for "
+ "net-id=%(net_uuid)s - no bridge for "
+ "physical_network %(physical_network)s"),
+ {'net_uuid': net_uuid,
+ 'physical_network': physical_network})
+ elif network_type == p_const.TYPE_LOCAL:
+ # no flows needed for local networks
+ pass
+ else:
+ LOG.error(_("Cannot provision unknown network type "
+ "%(network_type)s for net-id=%(net_uuid)s"),
+ {'network_type': network_type,
+ 'net_uuid': net_uuid})
+
+ def _reclaim_local_vlan_outbound(self, lvm):
+ br = self.phys_brs[lvm.physical_network]
+ match = br.ofparser.OFPMatch(
+ in_port=self.phys_ofports[lvm.physical_network],
+ vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT)
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ table_id=ryu_ofp13.OFPTT_ALL,
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY,
+ match=match)
+ self.ryu_send_msg(msg)
+
+ def _reclaim_local_vlan_inbound(self, lvm, vlan_vid):
+ br = self.int_br
+ match = br.ofparser.OFPMatch(
+ in_port=self.int_ofports[lvm.physical_network],
+ vlan_vid=vlan_vid)
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ table_id=ryu_ofp13.OFPTT_ALL,
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY,
+ match=match)
+ self.ryu_send_msg(msg)
+
+ def reclaim_local_vlan(self, net_uuid):
+ """Reclaim a local VLAN.
+
+ :param net_uuid: the network uuid associated with this vlan.
+ :param lvm: a LocalVLANMapping object that tracks (vlan, lsw_id,
+ vif_ids) mapping.
+ """
+ lvm = self.local_vlan_map.pop(net_uuid, None)
+ if lvm is None:
+ LOG.debug(_("Network %s not used on agent."), net_uuid)
+ return
+
+ LOG.info(_("Reclaiming vlan = %(vlan_id)s from net-id = %(net_uuid)s"),
+ {'vlan_id': lvm.vlan,
+ 'net_uuid': net_uuid})
+
+ if lvm.network_type in constants.TUNNEL_NETWORK_TYPES:
+ if self.enable_tunneling:
+ match = self.tun_br.ofparser.OFPMatch(
+ tunnel_id=int(lvm.segmentation_id))
+ msg = self.tun_br.ofparser.OFPFlowMod(
+ self.tun_br.datapath,
+ table_id=constants.TUN_TABLE[lvm.network_type],
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY,
+ match=match)
+ self.ryu_send_msg(msg)
+ match = self.tun_br.ofparser.OFPMatch(
+ vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT)
+ msg = self.tun_br.ofparser.OFPFlowMod(
+ self.tun_br.datapath,
+ table_id=ryu_ofp13.OFPTT_ALL,
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY,
+ match=match)
+ self.ryu_send_msg(msg)
+ elif lvm.network_type == p_const.TYPE_FLAT:
+ if lvm.physical_network in self.phys_brs:
+ self._reclaim_local_vlan_outbound(lvm)
+ self._reclaim_local_vlan_inbound(lvm, 0xffff)
+ elif lvm.network_type == p_const.TYPE_VLAN:
+ if lvm.physical_network in self.phys_brs:
+ self._reclaim_local_vlan_outbound(lvm)
+ self._reclaim_local_vlan_inbound(
+ lvm, lvm.segmentation_id | ryu_ofp13.OFPVID_PRESENT)
+ elif lvm.network_type == p_const.TYPE_LOCAL:
+ # no flows needed for local networks
+ pass
+ else:
+ LOG.error(_("Cannot reclaim unknown network type "
+ "%(network_type)s for net-id=%(net_uuid)s"),
+ {'network_type': lvm.network_type,
+ 'net_uuid': net_uuid})
+
+ self.available_local_vlans.add(lvm.vlan)
+
+ def port_bound(self, port, net_uuid,
+ network_type, physical_network, segmentation_id):
+ """Bind port to net_uuid/lsw_id and install flow for inbound traffic
+ to vm.
+
+ :param port: a ovs_lib.VifPort object.
+ :param net_uuid: the net_uuid this port is to be associated with.
+ :param network_type: the network type ('gre', 'vlan', 'flat', 'local')
+ :param physical_network: the physical network for 'vlan' or 'flat'
+ :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
+ """
+ if net_uuid not in self.local_vlan_map:
+ self.provision_local_vlan(net_uuid, network_type,
+ physical_network, segmentation_id)
+ lvm = self.local_vlan_map[net_uuid]
+ lvm.vif_ports[port.vif_id] = port
+
+ self.int_br.set_db_attribute("Port", port.port_name, "tag",
+ str(lvm.vlan))
+ if int(port.ofport) != -1:
+ match = self.int_br.ofparser.OFPMatch(in_port=port.ofport)
+ msg = self.int_br.ofparser.OFPFlowMod(
+ self.int_br.datapath,
+ table_id=ryu_ofp13.OFPTT_ALL,
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY,
+ match=match)
+ self.ryu_send_msg(msg)
+
+ def port_unbound(self, vif_id, net_uuid=None):
+ """Unbind port.
+
+ Removes corresponding local vlan mapping object if this is its last
+ VIF.
+
+ :param vif_id: the id of the vif
+ :param net_uuid: the net_uuid this port is associated with.
+ """
+ net_uuid = net_uuid or self.get_net_uuid(vif_id)
+
+ if not self.local_vlan_map.get(net_uuid):
+ LOG.info(_('port_unbound() net_uuid %s not in local_vlan_map'),
+ net_uuid)
+ return
+
+ lvm = self.local_vlan_map[net_uuid]
+ lvm.vif_ports.pop(vif_id, None)
+
+ if not lvm.vif_ports:
+ self.reclaim_local_vlan(net_uuid)
+
+ def port_dead(self, port):
+ """Once a port has no binding, put it on the "dead vlan".
+
+ :param port: a ovs_lib.VifPort object.
+ """
+ self.int_br.set_db_attribute("Port", port.port_name, "tag",
+ DEAD_VLAN_TAG)
+ match = self.int_br.ofparser.OFPMatch(in_port=int(port.ofport))
+ msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
+ priority=2, match=match)
+ self.ryu_send_msg(msg)
+
+ def setup_integration_br(self):
+ """Setup the integration bridge.
+
+ Create patch ports and remove all existing flows.
+
+ :param bridge_name: the name of the integration bridge.
+ :returns: the integration bridge
+ """
+ self.int_br.setup_ofp()
+ self.int_br.delete_port(cfg.CONF.OVS.int_peer_patch_port)
+ msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
+ table_id=ryu_ofp13.OFPTT_ALL,
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY)
+ self.ryu_send_msg(msg)
+ # switch all traffic using L2 learning
+ actions = [self.int_br.ofparser.OFPActionOutput(
+ ryu_ofp13.OFPP_NORMAL, 0)]
+ instructions = [self.int_br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS,
+ actions)]
+ msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath,
+ priority=1,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def setup_ancillary_bridges(self, integ_br, tun_br):
+ """Setup ancillary bridges - for example br-ex."""
+ ovs_bridges = set(ovs_lib.get_bridges(self.root_helper))
+ # Remove all known bridges
+ ovs_bridges.remove(integ_br)
+ if self.enable_tunneling:
+ ovs_bridges.remove(tun_br)
+ br_names = [self.phys_brs[physical_network].br_name for
+ physical_network in self.phys_brs]
+ ovs_bridges.difference_update(br_names)
+ # Filter list of bridges to those that have external
+ # bridge-id's configured
+ br_names = [
+ bridge for bridge in ovs_bridges
+ if bridge != ovs_lib.get_bridge_external_bridge_id(
+ self.root_helper, bridge)
+ ]
+ ovs_bridges.difference_update(br_names)
+ ancillary_bridges = []
+ for bridge in ovs_bridges:
+ br = OVSBridge(bridge, self.root_helper, self.ryuapp)
+ ancillary_bridges.append(br)
+ LOG.info(_('ancillary bridge list: %s.'), ancillary_bridges)
+ return ancillary_bridges
+
+ def _tun_br_sort_incoming_traffic_depend_in_port(self, br):
+ match = br.ofparser.OFPMatch(
+ in_port=int(self.patch_int_ofport))
+ instructions = [br.ofparser.OFPInstructionGotoTable(
+ table_id=constants.PATCH_LV_TO_TUN)]
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ priority=1,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+ msg = br.ofparser.OFPFlowMod(br.datapath, priority=0)
+ self.ryu_send_msg(msg)
+
+ def _tun_br_goto_table_ucast_unicast(self, br):
+ match = br.ofparser.OFPMatch(eth_dst=('00:00:00:00:00:00',
+ '01:00:00:00:00:00'))
+ instructions = [br.ofparser.OFPInstructionGotoTable(
+ table_id=constants.UCAST_TO_TUN)]
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ table_id=constants.PATCH_LV_TO_TUN,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _tun_br_goto_table_flood_broad_multi_cast(self, br):
+ match = br.ofparser.OFPMatch(eth_dst=('01:00:00:00:00:00',
+ '01:00:00:00:00:00'))
+ instructions = [br.ofparser.OFPInstructionGotoTable(
+ table_id=constants.FLOOD_TO_TUN)]
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ table_id=constants.PATCH_LV_TO_TUN,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _tun_br_set_table_tun_by_tunnel_type(self, br):
+ for tunnel_type in constants.TUNNEL_NETWORK_TYPES:
+ msg = br.ofparser.OFPFlowMod(
+ br.datapath,
+ table_id=constants.TUN_TABLE[tunnel_type],
+ priority=0)
+ self.ryu_send_msg(msg)
+
+ def _tun_br_output_patch_int(self, br):
+ actions = [br.ofparser.OFPActionOutput(
+ int(self.patch_int_ofport), 0)]
+ instructions = [br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS,
+ actions)]
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ table_id=constants.LEARN_FROM_TUN,
+ priority=1,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _tun_br_goto_table_flood_unknown_unicast(self, br):
+ instructions = [br.ofparser.OFPInstructionGotoTable(
+ table_id=constants.FLOOD_TO_TUN)]
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ table_id=constants.UCAST_TO_TUN,
+ priority=0,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ def _tun_br_default_drop(self, br):
+ msg = br.ofparser.OFPFlowMod(
+ br.datapath,
+ table_id=constants.FLOOD_TO_TUN,
+ priority=0)
+ self.ryu_send_msg(msg)
+
+ def setup_tunnel_br(self, tun_br):
+ """Setup the tunnel bridge.
+
+ Creates tunnel bridge, and links it to the integration bridge
+ using a patch port.
+
+ :param tun_br: the name of the tunnel bridge.
+ """
+ self.tun_br = OVSBridge(tun_br, self.root_helper, self.ryuapp)
+ self.tun_br.reset_bridge()
+ self.tun_br.setup_ofp()
+ self.patch_tun_ofport = self.int_br.add_patch_port(
+ cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
+ self.patch_int_ofport = self.tun_br.add_patch_port(
+ cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port)
+ if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0:
+ LOG.error(_("Failed to create OVS patch port. Cannot have "
+ "tunneling enabled on this agent, since this version "
+ "of OVS does not support tunnels or patch ports. "
+ "Agent terminated!"))
+ raise SystemExit(1)
+ msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
+ table_id=ryu_ofp13.OFPTT_ALL,
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY)
+ self.ryu_send_msg(msg)
+
+ self._tun_br_sort_incoming_traffic_depend_in_port(self.tun_br)
+ self._tun_br_goto_table_ucast_unicast(self.tun_br)
+ self._tun_br_goto_table_flood_broad_multi_cast(self.tun_br)
+ self._tun_br_set_table_tun_by_tunnel_type(self.tun_br)
+ self._tun_br_output_patch_int(self.tun_br)
+ self._tun_br_goto_table_flood_unknown_unicast(self.tun_br)
+ self._tun_br_default_drop(self.tun_br)
+
+ def _phys_br_prepare_create_veth(self, br, int_veth_name, phys_veth_name):
+ self.int_br.delete_port(int_veth_name)
+ br.delete_port(phys_veth_name)
+ if ip_lib.device_exists(int_veth_name, self.root_helper):
+ ip_lib.IPDevice(int_veth_name, self.root_helper).link.delete()
+ # Give udev a chance to process its rules here, to avoid
+ # race conditions between commands launched by udev rules
+ # and the subsequent call to ip_wrapper.add_veth
+ utils.execute(['/sbin/udevadm', 'settle', '--timeout=10'])
+
+ def _phys_br_create_veth(self, br, int_veth_name,
+ phys_veth_name, physical_network, ip_wrapper):
+ int_veth, phys_veth = ip_wrapper.add_veth(int_veth_name,
+ phys_veth_name)
+ self.int_ofports[physical_network] = self.int_br.add_port(int_veth)
+ self.phys_ofports[physical_network] = br.add_port(phys_veth)
+ return (int_veth, phys_veth)
+
+ def _phys_br_block_untranslated_traffic(self, br, physical_network):
+ match = br.ofparser.OFPMatch(in_port=int(
+ self.int_ofports[physical_network]))
+ msg = br.ofparser.OFPFlowMod(self.int_br.datapath,
+ priority=2, match=match)
+ self.ryu_send_msg(msg)
+ match = br.ofparser.OFPMatch(in_port=int(
+ self.phys_ofports[physical_network]))
+ msg = br.ofparser.OFPFlowMod(br.datapath, priority=2, match=match)
+ self.ryu_send_msg(msg)
+
+ def _phys_br_enable_veth_to_pass_traffic(self, int_veth, phys_veth):
+ # enable veth to pass traffic
+ int_veth.link.set_up()
+ phys_veth.link.set_up()
+
+ if self.veth_mtu:
+ # set up mtu size for veth interfaces
+ int_veth.link.set_mtu(self.veth_mtu)
+ phys_veth.link.set_mtu(self.veth_mtu)
+
+ def _phys_br_patch_physical_bridge_with_integration_bridge(
+ self, br, physical_network, bridge, ip_wrapper):
+ int_veth_name = constants.VETH_INTEGRATION_PREFIX + bridge
+ phys_veth_name = constants.VETH_PHYSICAL_PREFIX + bridge
+ self._phys_br_prepare_create_veth(br, int_veth_name, phys_veth_name)
+ int_veth, phys_veth = self._phys_br_create_veth(br, int_veth_name,
+ phys_veth_name,
+ physical_network,
+ ip_wrapper)
+ self._phys_br_block_untranslated_traffic(br, physical_network)
+ self._phys_br_enable_veth_to_pass_traffic(int_veth, phys_veth)
+
+ def setup_physical_bridges(self, bridge_mappings):
+ """Setup the physical network bridges.
+
+ Creates physical network bridges and links them to the
+ integration bridge using veths.
+
+ :param bridge_mappings: map physical network names to bridge names.
+ """
+ self.phys_brs = {}
+ self.int_ofports = {}
+ self.phys_ofports = {}
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper)
+ for physical_network, bridge in bridge_mappings.iteritems():
+ LOG.info(_("Mapping physical network %(physical_network)s to "
+ "bridge %(bridge)s"),
+ {'physical_network': physical_network,
+ 'bridge': bridge})
+ # setup physical bridge
+ if not ip_lib.device_exists(bridge, self.root_helper):
+ LOG.error(_("Bridge %(bridge)s for physical network "
+ "%(physical_network)s does not exist. Agent "
+ "terminated!"),
+ {'physical_network': physical_network,
+ 'bridge': bridge})
+ raise SystemExit(1)
+ br = OVSBridge(bridge, self.root_helper, self.ryuapp)
+ br.setup_ofp()
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ table_id=ryu_ofp13.OFPTT_ALL,
+ command=ryu_ofp13.OFPFC_DELETE,
+ out_group=ryu_ofp13.OFPG_ANY,
+ out_port=ryu_ofp13.OFPP_ANY)
+ self.ryu_send_msg(msg)
+ actions = [br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)]
+ instructions = [br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS,
+ actions)]
+ msg = br.ofparser.OFPFlowMod(br.datapath,
+ priority=1,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+ self.phys_brs[physical_network] = br
+
+ self._phys_br_patch_physical_bridge_with_integration_bridge(
+ br, physical_network, bridge, ip_wrapper)
+
+ def update_ports(self, registered_ports):
+ ports = self.int_br.get_vif_port_set()
+ if ports == registered_ports:
+ return
+ self.int_br_device_count = len(ports)
+ added = ports - registered_ports
+ removed = registered_ports - ports
+ return {'current': ports,
+ 'added': added,
+ 'removed': removed}
+
+ def update_ancillary_ports(self, registered_ports):
+ ports = set()
+ for bridge in self.ancillary_brs:
+ ports |= bridge.get_vif_port_set()
+
+ if ports == registered_ports:
+ return
+ added = ports - registered_ports
+ removed = registered_ports - ports
+ return {'current': ports,
+ 'added': added,
+ 'removed': removed}
+
+ def treat_vif_port(self, vif_port, port_id, network_id, network_type,
+ physical_network, segmentation_id, admin_state_up):
+ if vif_port:
+ if admin_state_up:
+ self.port_bound(vif_port, network_id, network_type,
+ physical_network, segmentation_id)
+ else:
+ self.port_dead(vif_port)
+ else:
+ LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
+
+ def setup_tunnel_port(self, port_name, remote_ip, tunnel_type):
+ ofport = self.tun_br.add_tunnel_port(port_name,
+ remote_ip,
+ self.local_ip,
+ tunnel_type,
+ self.vxlan_udp_port)
+ ofport_int = -1
+ try:
+ ofport_int = int(ofport)
+ except (TypeError, ValueError):
+ LOG.exception(_("ofport should have a value that can be "
+ "interpreted as an integer"))
+ if ofport_int < 0:
+ LOG.error(_("Failed to set-up %(type)s tunnel port to %(ip)s"),
+ {'type': tunnel_type, 'ip': remote_ip})
+ return 0
+
+ self.tun_br_ofports[tunnel_type][remote_ip] = ofport
+ # Add flow in default table to resubmit to the right
+ # tunelling table (lvid will be set in the latter)
+ match = self.tun_br.ofparser.OFPMatch(in_port=int(ofport))
+ instructions = [self.tun_br.ofparser.OFPInstructionGotoTable(
+ table_id=constants.TUN_TABLE[tunnel_type])]
+ msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
+ priority=1,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+
+ ofports = [int(p) for p in self.tun_br_ofports[tunnel_type].values()]
+ if ofports:
+ # Update flooding flows to include the new tunnel
+ for network_id, vlan_mapping in self.local_vlan_map.iteritems():
+ if vlan_mapping.network_type == tunnel_type:
+ match = self.tun_br.ofparser.OFPMatch(
+ vlan_vid=int(vlan_mapping.vlan) |
+ ryu_ofp13.OFPVID_PRESENT)
+ actions = [
+ self.tun_br.ofparser.OFPActionPopVlan(),
+ self.tun_br.ofparser.OFPActionSetField(
+ tunnel_id=int(vlan_mapping.segmentation_id))]
+ actions.extend(
+ self.tun_br.ofparser.OFPActionOutput(p, 0)
+ for p in ofports
+ )
+ instructions = [
+ self.tun_br.ofparser.OFPInstructionActions(
+ ryu_ofp13.OFPIT_APPLY_ACTIONS,
+ actions)]
+ msg = self.tun_br.ofparser.OFPFlowMod(
+ self.tun_br.datapath,
+ table_id=constants.FLOOD_TO_TUN,
+ priority=1,
+ match=match,
+ instructions=instructions)
+ self.ryu_send_msg(msg)
+ return ofport
+
+ def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
+ # Check if this tunnel port is still used
+ for lvm in self.local_vlan_map.values():
+ if tun_ofport in lvm.tun_ofports:
+ break
+ # If not, remove it
+ else:
+ for remote_ip, ofport in self.tun_br_ofports[tunnel_type].items():
+ if ofport == tun_ofport:
+ port_name = '%s-%s' % (tunnel_type, remote_ip)
+ self.tun_br.delete_port(port_name)
+ self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
+
+ def treat_devices_added(self, devices):
+ resync = False
+ self.sg_agent.prepare_devices_filter(devices)
+ for device in devices:
+ LOG.info(_("Port %s added"), device)
+ try:
+ details = self.plugin_rpc.get_device_details(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug(_("Unable to get port details for "
+ "%(device)s: %(e)s"),
+ {'device': device, 'e': e})
+ resync = True
+ continue
+ port = self.int_br.get_vif_port_by_id(details['device'])
+ if 'port_id' in details:
+ LOG.info(_("Port %(device)s updated. Details: %(details)s"),
+ {'device': device, 'details': details})
+ self.treat_vif_port(port, details['port_id'],
+ details['network_id'],
+ details['network_type'],
+ details['physical_network'],
+ details['segmentation_id'],
+ details['admin_state_up'])
+
+ # update plugin about port status
+ self.plugin_rpc.update_device_up(self.context,
+ device,
+ self.agent_id,
+ cfg.CONF.host)
+ else:
+ LOG.debug(_("Device %s not defined on plugin"), device)
+ if (port and int(port.ofport) != -1):
+ self.port_dead(port)
+ return resync
+
+ def treat_ancillary_devices_added(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info(_("Ancillary Port %s added"), device)
+ try:
+ self.plugin_rpc.get_device_details(self.context, device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug(_("Unable to get port details for "
+ "%(device)s: %(e)s"),
+ {'device': device, 'e': e})
+ resync = True
+ continue
+
+ # update plugin about port status
+ self.plugin_rpc.update_device_up(self.context,
+ device,
+ self.agent_id,
+ cfg.CONF.host)
+ return resync
+
+ def treat_devices_removed(self, devices):
+ resync = False
+ self.sg_agent.remove_devices_filter(devices)
+ for device in devices:
+ LOG.info(_("Attachment %s removed"), device)
+ try:
+ self.plugin_rpc.update_device_down(self.context,
+ device,
+ self.agent_id,
+ cfg.CONF.host)
+ except Exception as e:
+ LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
+ {'device': device, 'e': e})
+ resync = True
+ continue
+ self.port_unbound(device)
+ return resync
+
+ def treat_ancillary_devices_removed(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info(_("Attachment %s removed"), device)
+ try:
+ details = self.plugin_rpc.update_device_down(self.context,
+ device,
+ self.agent_id,
+ cfg.CONF.host)
+ except Exception as e:
+ LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
+ {'device': device, 'e': e})
+ resync = True
+ continue
+ if details['exists']:
+ LOG.info(_("Port %s updated."), device)
+ # Nothing to do regarding local networking
+ else:
+ LOG.debug(_("Device %s not defined on plugin"), device)
+ return resync
+
+ def process_network_ports(self, port_info):
+ resync_add = False
+ resync_removed = False
+ if 'added' in port_info:
+ start = time.time()
+ resync_add = self.treat_devices_added(port_info['added'])
+ LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
+ "treat_devices_added completed in %(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ if 'removed' in port_info:
+ start = time.time()
+ resync_removed = self.treat_devices_removed(port_info['removed'])
+ LOG.debug(_("process_network_ports - iteration:%(iter_num)d - "
+ "treat_devices_removed completed in %(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ # If one of the above opertaions fails => resync with plugin
+ return (resync_add | resync_removed)
+
+ def process_ancillary_network_ports(self, port_info):
+ resync_add = False
+ resync_removed = False
+ if 'added' in port_info:
+ start = time.time()
+ resync_add = self.treat_ancillary_devices_added(port_info['added'])
+ LOG.debug(_("process_ancillary_network_ports - iteration: "
+ "%(iter_num)d - treat_ancillary_devices_added "
+ "completed in %(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ if 'removed' in port_info:
+ start = time.time()
+ resync_removed = self.treat_ancillary_devices_removed(
+ port_info['removed'])
+ LOG.debug(_("process_ancillary_network_ports - iteration: "
+ "%(iter_num)d - treat_ancillary_devices_removed "
+ "completed in %(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+
+ # If one of the above opertaions fails => resync with plugin
+ return (resync_add | resync_removed)
+
+ def tunnel_sync(self):
+ resync = False
+ try:
+ for tunnel_type in self.tunnel_types:
+ details = self.plugin_rpc.tunnel_sync(self.context,
+ self.local_ip,
+ tunnel_type)
+ tunnels = details['tunnels']
+ for tunnel in tunnels:
+ if self.local_ip != tunnel['ip_address']:
+ tunnel_id = tunnel.get('id', tunnel['ip_address'])
+ tun_name = '%s-%s' % (tunnel_type, tunnel_id)
+ self.setup_tunnel_port(tun_name,
+ tunnel['ip_address'],
+ tunnel_type)
+ except Exception as e:
+ LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),
+ {'local_ip': self.local_ip, 'e': e})
+ resync = True
+ return resync
+
+ def ovsdb_monitor_loop(self, polling_manager=None):
+ if not polling_manager:
+ polling_manager = polling.AlwaysPoll()
+
+ sync = True
+ ports = set()
+ ancillary_ports = set()
+ tunnel_sync = True
+ while True:
+ try:
+ start = time.time()
+ port_stats = {'regular': {'added': 0, 'removed': 0},
+ 'ancillary': {'added': 0, 'removed': 0}}
+ LOG.debug(_("Agent ovsdb_monitor_loop - "
+ "iteration:%d started"),
+ self.iter_num)
+ if sync:
+ LOG.info(_("Agent out of sync with plugin!"))
+ ports.clear()
+ ancillary_ports.clear()
+ sync = False
+ polling_manager.force_polling()
+
+ # Notify the plugin of tunnel IP
+ if self.enable_tunneling and tunnel_sync:
+ LOG.info(_("Agent tunnel out of sync with plugin!"))
+ tunnel_sync = self.tunnel_sync()
+ if polling_manager.is_polling_required:
+ LOG.debug(_("Agent ovsdb_monitor_loop - "
+ "iteration:%(iter_num)d - "
+ "starting polling. Elapsed:%(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ port_info = self.update_ports(ports)
+ LOG.debug(_("Agent ovsdb_monitor_loop - "
+ "iteration:%(iter_num)d - "
+ "port information retrieved. "
+ "Elapsed:%(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ # notify plugin about port deltas
+ if port_info:
+ LOG.debug(_("Agent loop has new devices!"))
+ # If treat devices fails - must resync with plugin
+ sync = self.process_network_ports(port_info)
+ LOG.debug(_("Agent ovsdb_monitor_loop - "
+ "iteration:%(iter_num)d - "
+ "ports processed. Elapsed:%(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ ports = port_info['current']
+ port_stats['regular']['added'] = (
+ len(port_info.get('added', [])))
+ port_stats['regular']['removed'] = (
+ len(port_info.get('removed', [])))
+ # Treat ancillary devices if they exist
+ if self.ancillary_brs:
+ port_info = self.update_ancillary_ports(
+ ancillary_ports)
+ LOG.debug(_("Agent ovsdb_monitor_loop - "
+ "iteration:%(iter_num)d - "
+ "ancillary port info retrieved. "
+ "Elapsed:%(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+
+ if port_info:
+ rc = self.process_ancillary_network_ports(
+ port_info)
+ LOG.debug(_("Agent ovsdb_monitor_loop - "
+ "iteration:"
+ "%(iter_num)d - ancillary ports "
+ "processed. Elapsed:%(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ ancillary_ports = port_info['current']
+ port_stats['ancillary']['added'] = (
+ len(port_info.get('added', [])))
+ port_stats['ancillary']['removed'] = (
+ len(port_info.get('removed', [])))
+ sync = sync | rc
+
+ polling_manager.polling_completed()
+
+ except Exception:
+ LOG.exception(_("Error in agent event loop"))
+ sync = True
+ tunnel_sync = True
+
+ # sleep till end of polling interval
+ elapsed = (time.time() - start)
+ LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "
+ "completed. Processed ports statistics:"
+ "%(port_stats)s. Elapsed:%(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'port_stats': port_stats,
+ 'elapsed': elapsed})
+ if (elapsed < self.polling_interval):
+ time.sleep(self.polling_interval - elapsed)
+ else:
+ LOG.debug(_("Loop iteration exceeded interval "
+ "(%(polling_interval)s vs. %(elapsed)s)!"),
+ {'polling_interval': self.polling_interval,
+ 'elapsed': elapsed})
+ self.iter_num = self.iter_num + 1
+
+ def daemon_loop(self):
+ with polling.get_polling_manager(
+ self.minimize_polling,
+ self.root_helper,
+ self.ovsdb_monitor_respawn_interval) as pm:
+
+ self.ovsdb_monitor_loop(polling_manager=pm)
+
+
+def create_agent_config_map(config):
+ """Create a map of agent config parameters.
+
+ :param config: an instance of cfg.CONF
+ :returns: a map of agent configuration parameters
+ """
+ try:
+ bridge_mappings = n_utils.parse_mappings(config.OVS.bridge_mappings)
+ except ValueError as e:
+ raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)
+
+ kwargs = dict(
+ integ_br=config.OVS.integration_bridge,
+ tun_br=config.OVS.tunnel_bridge,
+ local_ip=config.OVS.local_ip,
+ bridge_mappings=bridge_mappings,
+ root_helper=config.AGENT.root_helper,
+ polling_interval=config.AGENT.polling_interval,
+ minimize_polling=config.AGENT.minimize_polling,
+ tunnel_types=config.AGENT.tunnel_types,
+ veth_mtu=config.AGENT.veth_mtu,
+ l2_population=False,
+ ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
+ )
+
+ # If enable_tunneling is TRUE, set tunnel_type to default to GRE
+ if config.OVS.enable_tunneling and not kwargs['tunnel_types']:
+ kwargs['tunnel_types'] = [p_const.TYPE_GRE]
+
+ # Verify the tunnel_types specified are valid
+ for tun in kwargs['tunnel_types']:
+ if tun not in constants.TUNNEL_NETWORK_TYPES:
+ msg = _('Invalid tunnel type specificed: %s'), tun
+ raise ValueError(msg)
+ if not kwargs['local_ip']:
+ msg = _('Tunneling cannot be enabled without a valid local_ip.')
+ raise ValueError(msg)
+
+ return kwargs
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+from oslo.config import cfg
+
+from neutron.agent.common import config
+from neutron.plugins.openvswitch.common import config as ovs_config
+
+
+agent_opts = [
+ cfg.IntOpt('get_datapath_retry_times', default=60,
+ help=_("Number of seconds to retry acquiring "
+ "an Open vSwitch datapath")),
+]
+
+
+cfg.CONF.register_opts(ovs_config.ovs_opts, 'OVS')
+cfg.CONF.register_opts(ovs_config.agent_opts, 'AGENT')
+cfg.CONF.register_opts(agent_opts, 'AGENT')
+config.register_agent_state_opts_helper(cfg.CONF)
+config.register_root_helper(cfg.CONF)
# License for the specific language governing permissions and limitations
# under the License.
-import distutils.version as dist_version
import signal
import sys
import time
def _check_ovs_version(self):
if p_const.TYPE_VXLAN in self.tunnel_types:
- check_ovs_version(constants.MINIMUM_OVS_VXLAN_VERSION,
- self.root_helper)
+ try:
+ ovs_lib.check_ovs_vxlan_version(self.root_helper)
+ except SystemError:
+ LOG.exception(_("Agent terminated"))
+ raise SystemExit(1)
def _report_state(self):
# How many devices are likely used by a VM
sys.exit(1)
-def check_ovs_version(min_required_version, root_helper):
- LOG.debug(_("Checking OVS version for VXLAN support"))
- installed_klm_version = ovs_lib.get_installed_ovs_klm_version()
- installed_usr_version = ovs_lib.get_installed_ovs_usr_version(root_helper)
- # First check the userspace version
- if installed_usr_version:
- if dist_version.StrictVersion(
- installed_usr_version) < dist_version.StrictVersion(
- min_required_version):
- LOG.error(_('Failed userspace version check for Open '
- 'vSwitch with VXLAN support. To use '
- 'VXLAN tunnels with OVS, please ensure '
- 'the OVS version is %s '
- 'or newer!'), min_required_version)
- sys.exit(1)
- # Now check the kernel version
- if installed_klm_version:
- if dist_version.StrictVersion(
- installed_klm_version) < dist_version.StrictVersion(
- min_required_version):
- LOG.error(_('Failed kernel version check for Open '
- 'vSwitch with VXLAN support. To use '
- 'VXLAN tunnels with OVS, please ensure '
- 'the OVS version is %s or newer!'),
- min_required_version)
- raise SystemExit(1)
- else:
- LOG.warning(_('Cannot determine kernel Open vSwitch version, '
- 'please ensure your Open vSwitch kernel module '
- 'is at least version %s to support VXLAN '
- 'tunnels.'), min_required_version)
- else:
- LOG.warning(_('Unable to determine Open vSwitch version. Please '
- 'ensure that its version is %s or newer to use VXLAN '
- 'tunnels with OVS.'), min_required_version)
- raise SystemExit(1)
-
-
def create_agent_config_map(config):
"""Create a map of agent config parameters.
--- /dev/null
+# Copyright (c) 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.common import constants
+from neutron.extensions import portbindings
+from neutron.plugins.ml2.drivers import mech_ofagent
+from neutron.tests.unit.ml2 import _test_mech_agent as base
+
+
+class OfagentMechanismBaseTestCase(base.AgentMechanismBaseTestCase):
+ VIF_TYPE = portbindings.VIF_TYPE_OVS
+ CAP_PORT_FILTER = True
+ AGENT_TYPE = constants.AGENT_TYPE_OFA
+
+ GOOD_MAPPINGS = {'fake_physical_network': 'fake_bridge'}
+ GOOD_TUNNEL_TYPES = ['gre', 'vxlan']
+ GOOD_CONFIGS = {'bridge_mappings': GOOD_MAPPINGS,
+ 'tunnel_types': GOOD_TUNNEL_TYPES}
+
+ BAD_MAPPINGS = {'wrong_physical_network': 'wrong_bridge'}
+ BAD_TUNNEL_TYPES = ['bad_tunnel_type']
+ BAD_CONFIGS = {'bridge_mappings': BAD_MAPPINGS,
+ 'tunnel_types': BAD_TUNNEL_TYPES}
+
+ AGENTS = [{'alive': True,
+ 'configurations': GOOD_CONFIGS}]
+ AGENTS_DEAD = [{'alive': False,
+ 'configurations': GOOD_CONFIGS}]
+ AGENTS_BAD = [{'alive': False,
+ 'configurations': GOOD_CONFIGS},
+ {'alive': True,
+ 'configurations': BAD_CONFIGS}]
+
+ def setUp(self):
+ super(OfagentMechanismBaseTestCase, self).setUp()
+ self.driver = mech_ofagent.OfagentMechanismDriver()
+ self.driver.initialize()
+
+
+class OfagentMechanismGenericTestCase(OfagentMechanismBaseTestCase,
+ base.AgentMechanismGenericTestCase):
+ pass
+
+
+class OfagentMechanismLocalTestCase(OfagentMechanismBaseTestCase,
+ base.AgentMechanismLocalTestCase):
+ pass
+
+
+class OfagentMechanismFlatTestCase(OfagentMechanismBaseTestCase,
+ base.AgentMechanismFlatTestCase):
+ pass
+
+
+class OfagentMechanismVlanTestCase(OfagentMechanismBaseTestCase,
+ base.AgentMechanismVlanTestCase):
+ pass
+
+
+class OfagentMechanismGreTestCase(OfagentMechanismBaseTestCase,
+ base.AgentMechanismGreTestCase):
+ pass
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+import mock
+
+
+def patch_fake_oflib_of():
+ ryu_mod = mock.Mock()
+ ryu_base_mod = ryu_mod.base
+ ryu_lib_mod = ryu_mod.lib
+ ryu_lib_hub = ryu_lib_mod.hub
+ ryu_ofproto_mod = ryu_mod.ofproto
+ ryu_ofproto_of13 = ryu_ofproto_mod.ofproto_v1_3
+ ryu_ofproto_of13.OFPTT_ALL = 0xff
+ ryu_ofproto_of13.OFPG_ANY = 0xffffffff
+ ryu_ofproto_of13.OFPP_ANY = 0xffffffff
+ ryu_ofproto_of13.OFPFC_ADD = 0
+ ryu_ofproto_of13.OFPFC_DELETE = 3
+ ryu_app_mod = ryu_mod.app
+ ryu_app_ofctl_mod = ryu_app_mod.ofctl
+ ryu_ofctl_api = ryu_app_ofctl_mod.api
+ return mock.patch.dict('sys.modules',
+ {'ryu': ryu_mod,
+ 'ryu.base': ryu_base_mod,
+ 'ryu.lib': ryu_lib_mod,
+ 'ryu.lib.hub': ryu_lib_hub,
+ 'ryu.ofproto': ryu_ofproto_mod,
+ 'ryu.ofproto.ofproto_v1_3': ryu_ofproto_of13,
+ 'ryu.app': ryu_app_mod,
+ 'ryu.app.ofctl': ryu_app_ofctl_mod,
+ 'ryu.app.ofctl.api': ryu_ofctl_api})
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+from oslo.config import cfg
+
+from neutron.plugins.ofagent.common import config # noqa
+from neutron.tests import base
+
+
+class ConfigurationTest(base.BaseTestCase):
+ """Configuration file Tests."""
+ def test_ml2_defaults(self):
+ self.assertEqual(60, cfg.CONF.AGENT.get_datapath_retry_times)
--- /dev/null
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# Based on test for openvswitch agent(test_ovs_neutron_agent.py).
+#
+# Copyright (c) 2012 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
+
+import contextlib
+
+import mock
+from oslo.config import cfg
+import testtools
+
+from neutron.agent.linux import ip_lib
+from neutron.agent.linux import utils
+from neutron.openstack.common import importutils
+from neutron.openstack.common.rpc import common as rpc_common
+from neutron.plugins.common import constants as p_const
+from neutron.plugins.openvswitch.common import constants
+from neutron.tests import base
+from neutron.tests.unit.ofagent import fake_oflib
+
+
+NOTIFIER = ('neutron.plugins.ml2.rpc.AgentNotifierApi')
+
+
+class OFAAgentTestCase(base.BaseTestCase):
+
+ _AGENT_NAME = 'neutron.plugins.ofagent.agent.ofa_neutron_agent'
+
+ def setUp(self):
+ super(OFAAgentTestCase, self).setUp()
+ self.addCleanup(mock.patch.stopall)
+ self.fake_oflib_of = fake_oflib.patch_fake_oflib_of().start()
+ self.mod_agent = importutils.import_module(self._AGENT_NAME)
+ self.ryuapp = mock.Mock()
+ cfg.CONF.register_cli_opts([
+ cfg.StrOpt('ofp-listen-host', default='',
+ help='openflow listen host'),
+ cfg.IntOpt('ofp-tcp-listen-port', default=6633,
+ help='openflow tcp listen port')
+ ])
+ cfg.CONF.set_override('root_helper', 'fake_helper', group='AGENT')
+
+
+class CreateAgentConfigMap(OFAAgentTestCase):
+
+ def test_create_agent_config_map_succeeds(self):
+ self.assertTrue(self.mod_agent.create_agent_config_map(cfg.CONF))
+
+ def test_create_agent_config_map_fails_for_invalid_tunnel_config(self):
+ # An ip address is required for tunneling but there is no default,
+ # verify this for both gre and vxlan tunnels.
+ cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE],
+ group='AGENT')
+ with testtools.ExpectedException(ValueError):
+ self.mod_agent.create_agent_config_map(cfg.CONF)
+ cfg.CONF.set_override('tunnel_types', [p_const.TYPE_VXLAN],
+ group='AGENT')
+ with testtools.ExpectedException(ValueError):
+ self.mod_agent.create_agent_config_map(cfg.CONF)
+
+ def test_create_agent_config_map_enable_tunneling(self):
+ # Verify setting only enable_tunneling will default tunnel_type to GRE
+ cfg.CONF.set_override('tunnel_types', None, group='AGENT')
+ cfg.CONF.set_override('enable_tunneling', True, group='OVS')
+ cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
+ cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF)
+ self.assertEqual(cfgmap['tunnel_types'], [p_const.TYPE_GRE])
+
+ def test_create_agent_config_map_fails_no_local_ip(self):
+ # An ip address is required for tunneling but there is no default
+ cfg.CONF.set_override('enable_tunneling', True, group='OVS')
+ with testtools.ExpectedException(ValueError):
+ self.mod_agent.create_agent_config_map(cfg.CONF)
+
+ def test_create_agent_config_map_fails_for_invalid_tunnel_type(self):
+ cfg.CONF.set_override('tunnel_types', ['foobar'], group='AGENT')
+ with testtools.ExpectedException(ValueError):
+ self.mod_agent.create_agent_config_map(cfg.CONF)
+
+ def test_create_agent_config_map_multiple_tunnel_types(self):
+ cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
+ cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE,
+ p_const.TYPE_VXLAN], group='AGENT')
+ cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF)
+ self.assertEqual(cfgmap['tunnel_types'],
+ [p_const.TYPE_GRE, p_const.TYPE_VXLAN])
+
+
+class TestOFANeutronAgentOVSBridge(OFAAgentTestCase):
+
+ def setUp(self):
+ super(TestOFANeutronAgentOVSBridge, self).setUp()
+ self.br_name = 'bridge1'
+ self.root_helper = 'fake_helper'
+ self.ovs = self.mod_agent.OVSBridge(
+ self.br_name, self.root_helper, self.ryuapp)
+
+ def test_find_datapath_id(self):
+ with mock.patch.object(self.ovs, 'get_datapath_id',
+ return_value='12345'):
+ self.ovs.find_datapath_id()
+ self.assertEqual(self.ovs.datapath_id, '12345')
+
+ def _fake_get_datapath(self, app, datapath_id):
+ if self.ovs.retry_count >= 2:
+ datapath = mock.Mock()
+ datapath.ofproto_parser = mock.Mock()
+ return datapath
+ self.ovs.retry_count += 1
+ return None
+
+ def test_get_datapath_normal(self):
+ self.ovs.retry_count = 0
+ with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
+ new=self._fake_get_datapath):
+ self.ovs.datapath_id = '0x64'
+ self.ovs.get_datapath(retry_max=4)
+ self.assertEqual(self.ovs.retry_count, 2)
+
+ def test_get_datapath_retry_out_by_default_time(self):
+ cfg.CONF.set_override('get_datapath_retry_times', 3, group='AGENT')
+ with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
+ return_value=None) as mock_get_datapath:
+ with testtools.ExpectedException(SystemExit):
+ self.ovs.datapath_id = '0x64'
+ self.ovs.get_datapath(retry_max=3)
+ self.assertEqual(mock_get_datapath.call_count, 3)
+
+ def test_get_datapath_retry_out_by_specified_time(self):
+ with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
+ return_value=None) as mock_get_datapath:
+ with testtools.ExpectedException(SystemExit):
+ self.ovs.datapath_id = '0x64'
+ self.ovs.get_datapath(retry_max=2)
+ self.assertEqual(mock_get_datapath.call_count, 2)
+
+ def test_setup_ofp_default_par(self):
+ with contextlib.nested(
+ mock.patch.object(self.ovs, 'set_protocols'),
+ mock.patch.object(self.ovs, 'set_controller'),
+ mock.patch.object(self.ovs, 'find_datapath_id'),
+ mock.patch.object(self.ovs, 'get_datapath'),
+ ) as (mock_set_protocols, mock_set_controller,
+ mock_find_datapath_id, mock_get_datapath):
+ self.ovs.setup_ofp()
+ mock_set_protocols.assert_called_with('OpenFlow13')
+ mock_set_controller.assert_called_with(['tcp:127.0.0.1:6633'])
+ mock_get_datapath.assert_called_with(
+ cfg.CONF.AGENT.get_datapath_retry_times)
+ self.assertEqual(mock_find_datapath_id.call_count, 1)
+
+ def test_setup_ofp_specify_par(self):
+ controller_names = ['tcp:192.168.10.10:1234', 'tcp:172.17.16.20:5555']
+ with contextlib.nested(
+ mock.patch.object(self.ovs, 'set_protocols'),
+ mock.patch.object(self.ovs, 'set_controller'),
+ mock.patch.object(self.ovs, 'find_datapath_id'),
+ mock.patch.object(self.ovs, 'get_datapath'),
+ ) as (mock_set_protocols, mock_set_controller,
+ mock_find_datapath_id, mock_get_datapath):
+ self.ovs.setup_ofp(controller_names=controller_names,
+ protocols='OpenFlow133',
+ retry_max=11)
+ mock_set_protocols.assert_called_with('OpenFlow133')
+ mock_set_controller.assert_called_with(controller_names)
+ mock_get_datapath.assert_called_with(11)
+ self.assertEqual(mock_find_datapath_id.call_count, 1)
+
+ def test_setup_ofp_with_except(self):
+ with contextlib.nested(
+ mock.patch.object(self.ovs, 'set_protocols',
+ side_effect=RuntimeError),
+ mock.patch.object(self.ovs, 'set_controller'),
+ mock.patch.object(self.ovs, 'find_datapath_id'),
+ mock.patch.object(self.ovs, 'get_datapath'),
+ ) as (mock_set_protocols, mock_set_controller,
+ mock_find_datapath_id, mock_get_datapath):
+ with testtools.ExpectedException(SystemExit):
+ self.ovs.setup_ofp()
+
+
+class TestOFANeutronAgent(OFAAgentTestCase):
+
+ def setUp(self):
+ super(TestOFANeutronAgent, self).setUp()
+ notifier_p = mock.patch(NOTIFIER)
+ notifier_cls = notifier_p.start()
+ self.notifier = mock.Mock()
+ notifier_cls.return_value = self.notifier
+ # Avoid rpc initialization for unit tests
+ cfg.CONF.set_override('rpc_backend',
+ 'neutron.openstack.common.rpc.impl_fake')
+ kwargs = self.mod_agent.create_agent_config_map(cfg.CONF)
+
+ class MockFixedIntervalLoopingCall(object):
+ def __init__(self, f):
+ self.f = f
+
+ def start(self, interval=0):
+ self.f()
+
+ with contextlib.nested(
+ mock.patch.object(self.mod_agent.OFANeutronAgent,
+ 'setup_integration_br',
+ return_value=mock.Mock()),
+ mock.patch.object(self.mod_agent.OFANeutronAgent,
+ 'setup_ancillary_bridges',
+ return_value=[]),
+ mock.patch.object(self.mod_agent.OVSBridge,
+ 'get_local_port_mac',
+ return_value='00:00:00:00:00:01'),
+ mock.patch('neutron.agent.linux.utils.get_interface_mac',
+ return_value='00:00:00:00:00:01'),
+ mock.patch('neutron.openstack.common.loopingcall.'
+ 'FixedIntervalLoopingCall',
+ new=MockFixedIntervalLoopingCall)):
+ self.agent = self.mod_agent.OFANeutronAgent(self.ryuapp, **kwargs)
+ self.agent.tun_br = mock.Mock()
+ self.datapath = mock.Mock()
+ self.ofparser = mock.Mock()
+ self.datapath.ofparser = self.ofparser
+ self.ofparser.OFPMatch = mock.Mock()
+ self.ofparser.OFPMatch.return_value = mock.Mock()
+ self.ofparser.OFPFlowMod = mock.Mock()
+ self.ofparser.OFPFlowMod.return_value = mock.Mock()
+ self.agent.int_br.ofparser = self.ofparser
+
+ self.agent.sg_agent = mock.Mock()
+
+ def _mock_port_bound(self, ofport=None):
+ port = mock.Mock()
+ port.ofport = ofport
+ net_uuid = 'my-net-uuid'
+ with mock.patch.object(self.mod_agent.OVSBridge,
+ 'set_db_attribute',
+ return_value=True):
+ with mock.patch.object(self.agent,
+ 'ryu_send_msg') as ryu_send_msg_func:
+ self.agent.port_bound(port, net_uuid, 'local', None, None)
+ self.assertEqual(ryu_send_msg_func.called, ofport != -1)
+
+ def test_port_bound_deletes_flows_for_valid_ofport(self):
+ self._mock_port_bound(ofport=1)
+
+ def test_port_bound_ignores_flows_for_invalid_ofport(self):
+ self._mock_port_bound(ofport=-1)
+
+ def test_port_dead(self):
+ with mock.patch.object(self.mod_agent.OVSBridge,
+ 'set_db_attribute',
+ return_value=True):
+ with mock.patch.object(self.agent,
+ 'ryu_send_msg') as ryu_send_msg_func:
+ port = mock.Mock()
+ port.ofport = 2
+ self.agent.port_dead(port)
+ self.assertTrue(ryu_send_msg_func.called)
+
+ def mock_update_ports(self, vif_port_set=None, registered_ports=None):
+ with mock.patch.object(self.agent.int_br, 'get_vif_port_set',
+ return_value=vif_port_set):
+ return self.agent.update_ports(registered_ports)
+
+ def test_update_ports_returns_none_for_unchanged_ports(self):
+ self.assertIsNone(self.mock_update_ports())
+
+ def test_update_ports_returns_port_changes(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 2])
+ expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
+ actual = self.mock_update_ports(vif_port_set, registered_ports)
+ self.assertEqual(expected, actual)
+
+ def test_treat_devices_added_returns_true_for_missing_device(self):
+ with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
+ side_effect=Exception()):
+ self.assertTrue(self.agent.treat_devices_added([{}]))
+
+ def _mock_treat_devices_added(self, details, port, func_name):
+ """Mock treat devices added.
+
+ :param details: the details to return for the device
+ :param port: the port that get_vif_port_by_id should return
+ :param func_name: the function that should be called
+ :returns: whether the named function was called
+ """
+ with contextlib.nested(
+ mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
+ return_value=details),
+ mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
+ return_value=port),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+ mock.patch.object(self.agent, func_name)
+ ) as (get_dev_fn, get_vif_func, upd_dev_up, func):
+ self.assertFalse(self.agent.treat_devices_added([{}]))
+ return func.called
+
+ def test_treat_devices_added_ignores_invalid_ofport(self):
+ port = mock.Mock()
+ port.ofport = -1
+ self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port,
+ 'port_dead'))
+
+ def test_treat_devices_added_marks_unknown_port_as_dead(self):
+ port = mock.Mock()
+ port.ofport = 1
+ self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port,
+ 'port_dead'))
+
+ def test_treat_devices_added_updates_known_port(self):
+ details = mock.MagicMock()
+ details.__contains__.side_effect = lambda x: True
+ self.assertTrue(self._mock_treat_devices_added(details,
+ mock.Mock(),
+ 'treat_vif_port'))
+
+ def test_treat_devices_removed_returns_true_for_missing_device(self):
+ with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
+ side_effect=Exception()):
+ self.assertTrue(self.agent.treat_devices_removed([{}]))
+
+ def _mock_treat_devices_removed(self, port_exists):
+ details = dict(exists=port_exists)
+ with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
+ return_value=details):
+ with mock.patch.object(self.agent, 'port_unbound') as port_unbound:
+ self.assertFalse(self.agent.treat_devices_removed([{}]))
+ self.assertTrue(port_unbound.called)
+
+ def test_treat_devices_removed_unbinds_port(self):
+ self._mock_treat_devices_removed(True)
+
+ def test_treat_devices_removed_ignores_missing_port(self):
+ self._mock_treat_devices_removed(False)
+
+ def test_process_network_ports(self):
+ reply = {'current': set(['tap0']),
+ 'removed': set(['eth0']),
+ 'added': set(['eth1'])}
+ with mock.patch.object(self.agent, 'treat_devices_added',
+ return_value=False) as device_added:
+ with mock.patch.object(self.agent, 'treat_devices_removed',
+ return_value=False) as device_removed:
+ self.assertFalse(self.agent.process_network_ports(reply))
+ device_added.assert_called_once_with(set(['eth1']))
+ device_removed.assert_called_once_with(set(['eth0']))
+
+ def test_report_state(self):
+ with mock.patch.object(self.agent.state_rpc,
+ "report_state") as report_st:
+ self.agent.int_br_device_count = 5
+ self.agent._report_state()
+ report_st.assert_called_with(self.agent.context,
+ self.agent.agent_state)
+ self.assertNotIn("start_flag", self.agent.agent_state)
+ self.assertEqual(
+ self.agent.agent_state["configurations"]["devices"],
+ self.agent.int_br_device_count
+ )
+
+ def test_network_delete(self):
+ with contextlib.nested(
+ mock.patch.object(self.agent, "reclaim_local_vlan"),
+ mock.patch.object(self.agent.tun_br, "cleanup_tunnel_port")
+ ) as (recl_fn, clean_tun_fn):
+ self.agent.network_delete("unused_context",
+ network_id="123")
+ self.assertFalse(recl_fn.called)
+ self.agent.local_vlan_map["123"] = "LVM object"
+ self.agent.network_delete("unused_context",
+ network_id="123")
+ self.assertFalse(clean_tun_fn.called)
+ recl_fn.assert_called_with("123")
+
+ def test_port_update(self):
+ with contextlib.nested(
+ mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
+ mock.patch.object(self.agent, "treat_vif_port"),
+ mock.patch.object(self.agent.plugin_rpc, "update_device_up"),
+ mock.patch.object(self.agent.plugin_rpc, "update_device_down")
+ ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn):
+ port = {"id": "123",
+ "network_id": "124",
+ "admin_state_up": False}
+ getvif_fn.return_value = "vif_port_obj"
+ self.agent.port_update("unused_context",
+ port=port,
+ network_type="vlan",
+ segmentation_id="1",
+ physical_network="physnet")
+ treatvif_fn.assert_called_with("vif_port_obj", "123",
+ "124", "vlan", "physnet",
+ "1", False)
+ upddown_fn.assert_called_with(self.agent.context,
+ "123", self.agent.agent_id,
+ cfg.CONF.host)
+
+ port["admin_state_up"] = True
+ self.agent.port_update("unused_context",
+ port=port,
+ network_type="vlan",
+ segmentation_id="1",
+ physical_network="physnet")
+ updup_fn.assert_called_with(self.agent.context,
+ "123", self.agent.agent_id,
+ cfg.CONF.host)
+
+ def test_port_update_plugin_rpc_failed(self):
+ port = {'id': 1,
+ 'network_id': 1,
+ 'admin_state_up': True}
+ with contextlib.nested(
+ mock.patch.object(self.mod_agent.LOG, 'error'),
+ mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+ mock.patch.object(self.agent, 'port_bound'),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
+ mock.patch.object(self.agent, 'port_dead')
+ ) as (log, _, device_up, _, device_down, _):
+ device_up.side_effect = rpc_common.Timeout
+ self.agent.port_update(mock.Mock(), port=port)
+ self.assertTrue(device_up.called)
+ self.assertEqual(log.call_count, 1)
+
+ log.reset_mock()
+ port['admin_state_up'] = False
+ device_down.side_effect = rpc_common.Timeout
+ self.agent.port_update(mock.Mock(), port=port)
+ self.assertTrue(device_down.called)
+ self.assertEqual(log.call_count, 1)
+
+ def test_setup_physical_bridges(self):
+ with contextlib.nested(
+ mock.patch.object(ip_lib, "device_exists"),
+ mock.patch.object(utils, "execute"),
+ mock.patch.object(self.mod_agent.OVSBridge, "add_port"),
+ mock.patch.object(self.mod_agent.OVSBridge, "delete_port"),
+ mock.patch.object(self.mod_agent.OVSBridge, "set_protocols"),
+ mock.patch.object(self.mod_agent.OVSBridge, "set_controller"),
+ mock.patch.object(self.mod_agent.OVSBridge, "get_datapath_id",
+ return_value='0xa'),
+ mock.patch.object(self.agent.int_br, "add_port"),
+ mock.patch.object(self.agent.int_br, "delete_port"),
+ mock.patch.object(ip_lib.IPWrapper, "add_veth"),
+ mock.patch.object(ip_lib.IpLinkCommand, "delete"),
+ mock.patch.object(ip_lib.IpLinkCommand, "set_up"),
+ mock.patch.object(ip_lib.IpLinkCommand, "set_mtu"),
+ mock.patch.object(self.mod_agent.ryu_api, "get_datapath",
+ return_value=self.datapath)
+ ) as (devex_fn, utilsexec_fn,
+ ovs_addport_fn, ovs_delport_fn, ovs_set_protocols_fn,
+ ovs_set_controller_fn, ovs_datapath_id_fn, br_addport_fn,
+ br_delport_fn, addveth_fn, linkdel_fn, linkset_fn, linkmtu_fn,
+ ryu_api_fn):
+ devex_fn.return_value = True
+ parent = mock.MagicMock()
+ parent.attach_mock(utilsexec_fn, 'utils_execute')
+ parent.attach_mock(linkdel_fn, 'link_delete')
+ parent.attach_mock(addveth_fn, 'add_veth')
+ addveth_fn.return_value = (ip_lib.IPDevice("int-br-eth1"),
+ ip_lib.IPDevice("phy-br-eth1"))
+ ovs_addport_fn.return_value = "25"
+ br_addport_fn.return_value = "11"
+ self.agent.setup_physical_bridges({"physnet1": "br-eth"})
+ expected_calls = [mock.call.link_delete(),
+ mock.call.utils_execute(['/sbin/udevadm',
+ 'settle',
+ '--timeout=10']),
+ mock.call.add_veth('int-br-eth',
+ 'phy-br-eth')]
+ parent.assert_has_calls(expected_calls, any_order=False)
+ self.assertEqual(self.agent.int_ofports["physnet1"],
+ "11")
+ self.assertEqual(self.agent.phys_ofports["physnet1"],
+ "25")
+
+ def test_port_unbound(self):
+ with mock.patch.object(self.agent, "reclaim_local_vlan") as reclvl_fn:
+ self.agent.enable_tunneling = True
+ lvm = mock.Mock()
+ lvm.network_type = "gre"
+ lvm.vif_ports = {"vif1": mock.Mock()}
+ self.agent.local_vlan_map["netuid12345"] = lvm
+ self.agent.port_unbound("vif1", "netuid12345")
+ self.assertTrue(reclvl_fn.called)
+ reclvl_fn.called = False
+
+ lvm.vif_ports = {}
+ self.agent.port_unbound("vif1", "netuid12345")
+ self.assertEqual(reclvl_fn.call_count, 2)
+
+ lvm.vif_ports = {"vif1": mock.Mock()}
+ self.agent.port_unbound("vif3", "netuid12345")
+ self.assertEqual(reclvl_fn.call_count, 2)
+
+ def _check_ovs_vxlan_version(self, installed_usr_version,
+ installed_klm_version,
+ expecting_ok):
+ with mock.patch(
+ 'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version'
+ ) as klm_cmd:
+ with mock.patch(
+ 'neutron.agent.linux.ovs_lib.get_installed_ovs_usr_version'
+ ) as usr_cmd:
+ try:
+ klm_cmd.return_value = installed_klm_version
+ usr_cmd.return_value = installed_usr_version
+ self.agent.tunnel_types = 'vxlan'
+ self.agent._check_ovs_version()
+ version_ok = True
+ except SystemExit as e:
+ self.assertEqual(e.code, 1)
+ version_ok = False
+ self.assertEqual(version_ok, expecting_ok)
+
+ def test_check_minimum_version(self):
+ min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+ self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver,
+ expecting_ok=True)
+
+ def test_check_future_version(self):
+ install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01)
+ self._check_ovs_vxlan_version(install_ver, install_ver,
+ expecting_ok=True)
+
+ def test_check_fail_version(self):
+ install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01)
+ self._check_ovs_vxlan_version(install_ver, install_ver,
+ expecting_ok=False)
+
+ def test_check_fail_no_version(self):
+ self._check_ovs_vxlan_version(None, None,
+ expecting_ok=False)
+
+ def test_check_fail_klm_version(self):
+ min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+ install_ver = str(float(min_vxlan_ver) - 0.01)
+ self._check_ovs_vxlan_version(min_vxlan_ver, install_ver,
+ expecting_ok=False)
+
+ def test_daemon_loop_uses_polling_manager(self):
+ with mock.patch(
+ 'neutron.agent.linux.polling.get_polling_manager'
+ ) as mock_get_pm:
+ fake_pm = mock.Mock()
+ mock_get_pm.return_value = fake_pm
+ fake_pm.__enter__ = mock.Mock()
+ fake_pm.__exit__ = mock.Mock()
+ with mock.patch.object(
+ self.agent, 'ovsdb_monitor_loop'
+ ) as mock_loop:
+ self.agent.daemon_loop()
+ mock_get_pm.assert_called_once_with(True, 'fake_helper',
+ constants.DEFAULT_OVSDBMON_RESPAWN)
+ mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__())
+
+ def test_setup_tunnel_port_error_negative(self):
+ with contextlib.nested(
+ mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
+ return_value='-1'),
+ mock.patch.object(self.mod_agent.LOG, 'error')
+ ) as (add_tunnel_port_fn, log_error_fn):
+ ofport = self.agent.setup_tunnel_port(
+ 'gre-1', 'remote_ip', p_const.TYPE_GRE)
+ add_tunnel_port_fn.assert_called_once_with(
+ 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
+ self.agent.vxlan_udp_port)
+ log_error_fn.assert_called_once_with(
+ _("Failed to set-up %(type)s tunnel port to %(ip)s"),
+ {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
+ self.assertEqual(ofport, 0)
+
+ def test_setup_tunnel_port_error_not_int(self):
+ with contextlib.nested(
+ mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
+ return_value=None),
+ mock.patch.object(self.mod_agent.LOG, 'exception'),
+ mock.patch.object(self.mod_agent.LOG, 'error')
+ ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
+ ofport = self.agent.setup_tunnel_port(
+ 'gre-1', 'remote_ip', p_const.TYPE_GRE)
+ add_tunnel_port_fn.assert_called_once_with(
+ 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
+ self.agent.vxlan_udp_port)
+ log_exc_fn.assert_called_once_with(
+ _("ofport should have a value that can be "
+ "interpreted as an integer"))
+ log_error_fn.assert_called_once_with(
+ _("Failed to set-up %(type)s tunnel port to %(ip)s"),
+ {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
+ self.assertEqual(ofport, 0)
+
+
+class AncillaryBridgesTest(OFAAgentTestCase):
+
+ def setUp(self):
+ super(AncillaryBridgesTest, self).setUp()
+ notifier_p = mock.patch(NOTIFIER)
+ notifier_cls = notifier_p.start()
+ self.notifier = mock.Mock()
+ notifier_cls.return_value = self.notifier
+ # Avoid rpc initialization for unit tests
+ cfg.CONF.set_override('rpc_backend',
+ 'neutron.openstack.common.rpc.impl_fake')
+ cfg.CONF.set_override('report_interval', 0, 'AGENT')
+ self.kwargs = self.mod_agent.create_agent_config_map(cfg.CONF)
+
+ def _test_ancillary_bridges(self, bridges, ancillary):
+ device_ids = ancillary[:]
+
+ def pullup_side_effect(self, *args):
+ result = device_ids.pop(0)
+ return result
+
+ with contextlib.nested(
+ mock.patch.object(self.mod_agent.OFANeutronAgent,
+ 'setup_integration_br',
+ return_value=mock.Mock()),
+ mock.patch('neutron.agent.linux.utils.get_interface_mac',
+ return_value='00:00:00:00:00:01'),
+ mock.patch.object(self.mod_agent.OVSBridge,
+ 'get_local_port_mac',
+ return_value='00:00:00:00:00:01'),
+ mock.patch('neutron.agent.linux.ovs_lib.get_bridges',
+ return_value=bridges),
+ mock.patch(
+ 'neutron.agent.linux.ovs_lib.get_bridge_external_bridge_id',
+ side_effect=pullup_side_effect)):
+ self.agent = self.mod_agent.OFANeutronAgent(
+ self.ryuapp, **self.kwargs)
+ self.assertEqual(len(ancillary), len(self.agent.ancillary_brs))
+ if ancillary:
+ bridges = [br.br_name for br in self.agent.ancillary_brs]
+ for br in ancillary:
+ self.assertIn(br, bridges)
+
+ def test_ancillary_bridges_single(self):
+ bridges = ['br-int', 'br-ex']
+ self._test_ancillary_bridges(bridges, ['br-ex'])
+
+ def test_ancillary_bridges_none(self):
+ bridges = ['br-int']
+ self._test_ancillary_bridges(bridges, [])
+
+ def test_ancillary_bridges_multiple(self):
+ bridges = ['br-int', 'br-ex1', 'br-ex2']
+ self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])
from neutron.agent.linux import utils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import uuidutils
+from neutron.plugins.openvswitch.common import constants
from neutron.tests import base
from neutron.tests import tools
# test __str__
str(port)
+ def test_set_controller(self):
+ controller_names = ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555']
+ self.br.set_controller(controller_names)
+ self.execute.assert_called_once_with(
+ ['ovs-vsctl', self.TO, '--', 'set-controller', self.BR_NAME,
+ 'tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'],
+ root_helper=self.root_helper)
+
+ def test_del_controller(self):
+ self.br.del_controller()
+ self.execute.assert_called_once_with(
+ ['ovs-vsctl', self.TO, '--', 'del-controller', self.BR_NAME],
+ root_helper=self.root_helper)
+
+ def test_get_controller(self):
+ self.execute.return_value = 'tcp:127.0.0.1:6633\ntcp:172.17.16.10:5555'
+ names = self.br.get_controller()
+ self.assertEqual(names,
+ ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'])
+ self.execute.assert_called_once_with(
+ ['ovs-vsctl', self.TO, '--', 'get-controller', self.BR_NAME],
+ root_helper=self.root_helper)
+
+ def test_set_protocols(self):
+ protocols = 'OpenFlow13'
+ self.br.set_protocols(protocols)
+ self.execute.assert_called_once_with(
+ ['ovs-vsctl', self.TO, '--', 'set', 'bridge', self.BR_NAME,
+ "protocols=%s" % protocols],
+ root_helper=self.root_helper)
+
def test_create(self):
self.br.add_bridge(self.BR_NAME)
data = [[["map", external_ids], "tap99", 1]]
self.assertIsNone(self._test_get_vif_port_by_id('tap99id', data,
"br-ext"))
+
+ def _check_ovs_vxlan_version(self, installed_usr_version,
+ installed_klm_version,
+ expecting_ok):
+ with mock.patch(
+ 'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version'
+ ) as klm_cmd:
+ with mock.patch(
+ 'neutron.agent.linux.ovs_lib.get_installed_ovs_usr_version'
+ ) as usr_cmd:
+ try:
+ klm_cmd.return_value = installed_klm_version
+ usr_cmd.return_value = installed_usr_version
+ ovs_lib.check_ovs_vxlan_version(root_helper='sudo')
+ version_ok = True
+ except SystemError:
+ version_ok = False
+ self.assertEqual(version_ok, expecting_ok)
+
+ def test_check_minimum_version(self):
+ min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+ self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver,
+ expecting_ok=True)
+
+ def test_check_future_version(self):
+ install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01)
+ self._check_ovs_vxlan_version(install_ver, install_ver,
+ expecting_ok=True)
+
+ def test_check_fail_version(self):
+ install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01)
+ self._check_ovs_vxlan_version(install_ver, install_ver,
+ expecting_ok=False)
+
+ def test_check_fail_no_version(self):
+ self._check_ovs_vxlan_version(None, None,
+ expecting_ok=False)
+
+ def test_check_fail_klm_version(self):
+ min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+ install_ver = str(float(min_vxlan_ver) - 0.01)
+ self._check_ovs_vxlan_version(min_vxlan_ver,
+ install_ver,
+ expecting_ok=False)
self.assertEqual(reclvl_fn.call_count, 2)
def _check_ovs_vxlan_version(self, installed_usr_version,
- installed_klm_version, min_vers,
+ installed_klm_version,
expecting_ok):
with mock.patch(
'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version'
klm_cmd.return_value = installed_klm_version
usr_cmd.return_value = installed_usr_version
self.agent.tunnel_types = 'vxlan'
- ovs_neutron_agent.check_ovs_version(min_vers,
- root_helper='sudo')
+ self.agent._check_ovs_version()
version_ok = True
except SystemExit as e:
self.assertEqual(e.code, 1)
self.assertEqual(version_ok, expecting_ok)
def test_check_minimum_version(self):
- self._check_ovs_vxlan_version('1.10', '1.10',
- constants.MINIMUM_OVS_VXLAN_VERSION,
+ min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+ self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver,
expecting_ok=True)
def test_check_future_version(self):
- self._check_ovs_vxlan_version('1.11', '1.11',
- constants.MINIMUM_OVS_VXLAN_VERSION,
+ install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01)
+ self._check_ovs_vxlan_version(install_ver, install_ver,
expecting_ok=True)
def test_check_fail_version(self):
- self._check_ovs_vxlan_version('1.9', '1.9',
- constants.MINIMUM_OVS_VXLAN_VERSION,
+ install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01)
+ self._check_ovs_vxlan_version(install_ver, install_ver,
expecting_ok=False)
def test_check_fail_no_version(self):
self._check_ovs_vxlan_version(None, None,
- constants.MINIMUM_OVS_VXLAN_VERSION,
expecting_ok=False)
def test_check_fail_klm_version(self):
- self._check_ovs_vxlan_version('1.10', '1.9',
- constants.MINIMUM_OVS_VXLAN_VERSION,
+ min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION
+ install_ver = str(float(min_vxlan_ver) - 0.01)
+ self._check_ovs_vxlan_version(min_vxlan_ver, install_ver,
expecting_ok=False)
def _prepare_l2_pop_ofports(self):
etc/neutron/plugins/ml2/ml2_conf_arista.ini
etc/neutron/plugins/ml2/ml2_conf_cisco.ini
etc/neutron/plugins/bigswitch/restproxy.ini
+ etc/neutron/plugins/ml2/ml2_conf_ofa.ini
etc/neutron/plugins/mlnx = etc/neutron/plugins/mlnx/mlnx_conf.ini
etc/neutron/plugins/nec = etc/neutron/plugins/nec/nec.ini
etc/neutron/plugins/nicira = etc/neutron/plugins/nicira/nvp.ini
quantum-rootwrap = oslo.rootwrap.cmd:main
quantum-usage-audit = neutron.cmd.usage_audit:main
neutron-metering-agent = neutron.services.metering.agents.metering_agent:main
+ neutron-ofagent-agent = ryu.cmd.ofa_neutron_agent:main
neutron.core_plugins =
bigswitch = neutron.plugins.bigswitch.plugin:NeutronRestProxyV2
brocade = neutron.plugins.brocade.NeutronPlugin:BrocadePluginV2
cisco_nexus = neutron.plugins.ml2.drivers.cisco.nexus.mech_cisco_nexus:CiscoNexusMechanismDriver
l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver
bigswitch = neutron.plugins.ml2.drivers.mech_bigswitch.driver:BigSwitchMechanismDriver
+ ofagent = neutron.plugins.ml2.drivers.mech_ofagent:OfagentMechanismDriver
neutron.openstack.common.cache.backends =
memory = neutron.openstack.common.cache._backends.memory:MemoryBackend