From: fumihiko kakuma Date: Wed, 29 Jan 2014 01:54:12 +0000 (+0900) Subject: Implement OpenFlow Agent mechanism driver X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=16b7ddaadeabcd53abe6e1f62dffd4d641bbcbf7;p=openstack-build%2Fneutron-build.git Implement OpenFlow Agent mechanism driver This adds ML2 mechanism driver controlling OpenFlow switches and an agent using Ryu as OpenFlow Python library. - An agent acts as an OpenFlow controller on each compute nodes. - OpenFlow 1.3 (vendor agnostic unlike OVS extensions). Implements: blueprint ryu-ml2-driver Change-Id: I6a8168d24f911996639179d91c4da49151751057 --- diff --git a/etc/neutron/plugins/ml2/ml2_conf_ofa.ini b/etc/neutron/plugins/ml2/ml2_conf_ofa.ini new file mode 100644 index 000000000..4a94b9870 --- /dev/null +++ b/etc/neutron/plugins/ml2/ml2_conf_ofa.ini @@ -0,0 +1,13 @@ +# Defines configuration options specific to the OpenFlow Agent Mechanism Driver + +[ovs] +# Please refer to configuration options to the OpenvSwitch + +[agent] +# (IntOpt) Number of seconds to retry acquiring an Open vSwitch datapath. +# This is an optional parameter, default value is 60 seconds. +# +# get_datapath_retry_times = +# Example: get_datapath_retry_times = 30 + +# Please refer to configuration options to the OpenvSwitch else the above. diff --git a/neutron/agent/linux/ovs_lib.py b/neutron/agent/linux/ovs_lib.py index c040d6ecc..a6bddec3c 100644 --- a/neutron/agent/linux/ovs_lib.py +++ b/neutron/agent/linux/ovs_lib.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import distutils.version as dist_version import re from oslo.config import cfg @@ -106,6 +107,27 @@ class OVSBridge(BaseOVS): self.defer_apply_flows = False self.deferred_flows = {'add': '', 'mod': '', 'del': ''} + def set_controller(self, controller_names): + vsctl_command = ['--', 'set-controller', self.br_name] + vsctl_command.extend(controller_names) + self.run_vsctl(vsctl_command, check_error=True) + + def del_controller(self): + self.run_vsctl(['--', 'del-controller', self.br_name], + check_error=True) + + def get_controller(self): + res = self.run_vsctl(['--', 'get-controller', self.br_name], + check_error=True) + if res: + return res.strip().split('\n') + return res + + def set_protocols(self, protocols): + self.run_vsctl(['--', 'set', 'bridge', self.br_name, + "protocols=%s" % protocols], + check_error=True) + def create(self): self.add_bridge(self.br_name) @@ -401,7 +423,7 @@ class OVSBridge(BaseOVS): ofport = data[ofport_idx] # ofport must be integer otherwise return None if not isinstance(ofport, int) or ofport == -1: - LOG.warn(_("ofport: %(ofport)s for VIF: %(vif)s is not a" + LOG.warn(_("ofport: %(ofport)s for VIF: %(vif)s is not a " "positive integer"), {'ofport': ofport, 'vif': port_id}) return @@ -483,3 +505,44 @@ def get_bridge_external_bridge_id(root_helper, bridge): except Exception: LOG.exception(_("Bridge %s not found."), bridge) return None + + +def _compare_installed_and_required_version( + installed_version, required_version, check_type, version_type): + if installed_version: + if dist_version.StrictVersion( + installed_version) < dist_version.StrictVersion( + required_version): + msg = (_('Failed %(ctype)s version check for Open ' + 'vSwitch with %(vtype)s support. To use ' + '%(vtype)s tunnels with OVS, please ensure ' + 'the OVS version is %(required)s or newer!') % + {'ctype': check_type, 'vtype': version_type, + 'required': required_version}) + raise SystemError(msg) + else: + msg = (_('Unable to determine %(ctype)s version for Open ' + 'vSwitch with %(vtype)s support. To use ' + '%(vtype)s tunnels with OVS, please ensure ' + 'that the version is %(required)s or newer!') % + {'ctype': check_type, 'vtype': version_type, + 'required': required_version}) + raise SystemError(msg) + + +def check_ovs_vxlan_version(root_helper): + min_required_version = constants.MINIMUM_OVS_VXLAN_VERSION + installed_klm_version = get_installed_ovs_klm_version() + installed_usr_version = get_installed_ovs_usr_version(root_helper) + LOG.debug(_("Checking OVS version for VXLAN support " + "installed klm version is %s "), installed_klm_version) + LOG.debug(_("Checking OVS version for VXLAN support " + "installed usr version is %s"), installed_usr_version) + # First check the userspace version + _compare_installed_and_required_version(installed_usr_version, + min_required_version, + 'userspace', 'VXLAN') + # Now check the kernel version + _compare_installed_and_required_version(installed_klm_version, + min_required_version, + 'kernel', 'VXLAN') diff --git a/neutron/common/constants.py b/neutron/common/constants.py index 92d4a5615..4975c013f 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -73,6 +73,7 @@ AGENT_TYPE_OVS = 'Open vSwitch agent' AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent' AGENT_TYPE_HYPERV = 'HyperV agent' AGENT_TYPE_NEC = 'NEC plugin agent' +AGENT_TYPE_OFA = 'OFA driver agent' AGENT_TYPE_L3 = 'L3 agent' AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent' AGENT_TYPE_MLNX = 'Mellanox plugin agent' diff --git a/neutron/plugins/ml2/drivers/mech_ofagent.py b/neutron/plugins/ml2/drivers/mech_ofagent.py new file mode 100644 index 000000000..3d3909b01 --- /dev/null +++ b/neutron/plugins/ml2/drivers/mech_ofagent.py @@ -0,0 +1,60 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# Based on openvswitch mechanism driver. +# +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. + +from neutron.common import constants +from neutron.extensions import portbindings +from neutron.openstack.common import log +from neutron.plugins.common import constants as p_const +from neutron.plugins.ml2 import driver_api as api +from neutron.plugins.ml2.drivers import mech_agent + +LOG = log.getLogger(__name__) + + +class OfagentMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase): + """Attach to networks using ofagent L2 agent. + + The OfagentMechanismDriver integrates the ml2 plugin with the + ofagent L2 agent. Port binding with this driver requires the + ofagent agent to be running on the port's host, and that agent + to have connectivity to at least one segment of the port's + network. + """ + + def __init__(self): + super(OfagentMechanismDriver, self).__init__( + constants.AGENT_TYPE_OFA, + portbindings.VIF_TYPE_OVS, + {portbindings.CAP_PORT_FILTER: True}) + + def check_segment_for_agent(self, segment, agent): + mappings = agent['configurations'].get('bridge_mappings', {}) + tunnel_types = agent['configurations'].get('tunnel_types', []) + LOG.debug(_("Checking segment: %(segment)s " + "for mappings: %(mappings)s " + "with tunnel_types: %(tunnel_types)s"), + {'segment': segment, 'mappings': mappings, + 'tunnel_types': tunnel_types}) + network_type = segment[api.NETWORK_TYPE] + return ( + network_type == p_const.TYPE_LOCAL or + network_type in tunnel_types or + (network_type in [p_const.TYPE_FLAT, p_const.TYPE_VLAN] and + segment[api.PHYSICAL_NETWORK] in mappings) + ) diff --git a/neutron/plugins/ofagent/README b/neutron/plugins/ofagent/README new file mode 100644 index 000000000..a43b0dd07 --- /dev/null +++ b/neutron/plugins/ofagent/README @@ -0,0 +1,21 @@ +This directory includes agent for OpenFlow Agent mechanism driver. + +# -- Installation + +For how to install/set up ML2 mechanism driver for OpenFlow Agent, please refer to +https://github.com/osrg/ryu/wiki/OpenStack + +# -- Ryu General + +For general Ryu stuff, please refer to +http://www.osrg.net/ryu/ + +Ryu is available at github +git://github.com/osrg/ryu.git +https://github.com/osrg/ryu + +The mailing is at +ryu-devel@lists.sourceforge.net +https://lists.sourceforge.net/lists/listinfo/ryu-devel + +Enjoy! diff --git a/neutron/plugins/ofagent/__init__.py b/neutron/plugins/ofagent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/plugins/ofagent/agent/__init__.py b/neutron/plugins/ofagent/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py new file mode 100644 index 000000000..d428be2ab --- /dev/null +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -0,0 +1,1381 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# Based on openvswitch agent. +# +# Copyright 2011 Nicira Networks, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. + +import time + +from oslo.config import cfg +from ryu.app.ofctl import api as ryu_api +from ryu.base import app_manager +from ryu.lib import hub +from ryu.ofproto import ofproto_v1_3 as ryu_ofp13 + +from neutron.agent.linux import ip_lib +from neutron.agent.linux import ovs_lib +from neutron.agent.linux import polling +from neutron.agent.linux import utils +from neutron.agent import rpc as agent_rpc +from neutron.agent import securitygroups_rpc as sg_rpc +from neutron.common import constants as n_const +from neutron.common import topics +from neutron.common import utils as n_utils +from neutron import context +from neutron.extensions import securitygroup as ext_sg +from neutron.openstack.common import log as logging +from neutron.openstack.common import loopingcall +from neutron.openstack.common.rpc import common as rpc_common +from neutron.openstack.common.rpc import dispatcher +from neutron.plugins.common import constants as p_const +from neutron.plugins.ofagent.common import config # noqa +from neutron.plugins.openvswitch.common import constants + + +LOG = logging.getLogger(__name__) + +# A placeholder for dead vlans. +DEAD_VLAN_TAG = str(n_const.MAX_VLAN_TAG + 1) + + +# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac' +# attributes set). +class LocalVLANMapping: + def __init__(self, vlan, network_type, physical_network, segmentation_id, + vif_ports=None): + if vif_ports is None: + vif_ports = {} + self.vlan = vlan + self.network_type = network_type + self.physical_network = physical_network + self.segmentation_id = segmentation_id + self.vif_ports = vif_ports + # set of tunnel ports on which packets should be flooded + self.tun_ofports = set() + + def __str__(self): + return ("lv-id = %s type = %s phys-net = %s phys-id = %s" % + (self.vlan, self.network_type, self.physical_network, + self.segmentation_id)) + + +class Port(object): + """Represents a neutron port. + + Class stores port data in a ORM-free way, so attributres are + still available even if a row has been deleted. + """ + + def __init__(self, p): + self.id = p.id + self.network_id = p.network_id + self.device_id = p.device_id + self.admin_state_up = p.admin_state_up + self.status = p.status + + def __eq__(self, other): + """Compare only fields that will cause us to re-wire.""" + try: + return (other and self.id == other.id + and self.admin_state_up == other.admin_state_up) + except Exception: + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self.id) + + +class OVSBridge(ovs_lib.OVSBridge): + def __init__(self, br_name, root_helper, ryuapp): + super(OVSBridge, self).__init__(br_name, root_helper) + self.datapath_id = None + self.datapath = None + self.ofparser = None + self.ryuapp = ryuapp + + def find_datapath_id(self): + self.datapath_id = self.get_datapath_id() + + def get_datapath(self, retry_max=cfg.CONF.AGENT.get_datapath_retry_times): + retry = 0 + while self.datapath is None: + self.datapath = ryu_api.get_datapath(self.ryuapp, + int(self.datapath_id, 16)) + retry += 1 + if retry >= retry_max: + LOG.error(_('Agent terminated!: Failed to get a datapath.')) + raise SystemExit(1) + time.sleep(1) + self.ofparser = self.datapath.ofproto_parser + + def setup_ofp(self, controller_names=None, + protocols='OpenFlow13', + retry_max=cfg.CONF.AGENT.get_datapath_retry_times): + if not controller_names: + host = cfg.CONF.ofp_listen_host + if not host: + # 127.0.0.1 is a default for agent style of controller + host = '127.0.0.1' + controller_names = ["tcp:%s:%d" % (host, + cfg.CONF.ofp_tcp_listen_port)] + try: + self.set_protocols(protocols) + self.set_controller(controller_names) + except RuntimeError: + LOG.exception(_("Agent terminated")) + raise SystemExit(1) + self.find_datapath_id() + self.get_datapath(retry_max) + + +class OFAPluginApi(agent_rpc.PluginApi, + sg_rpc.SecurityGroupServerRpcApiMixin): + pass + + +class OFASecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin): + def __init__(self, context, plugin_rpc, root_helper): + self.context = context + self.plugin_rpc = plugin_rpc + self.root_helper = root_helper + self.init_firewall() + + +class OFANeutronAgentRyuApp(app_manager.RyuApp): + OFP_VERSIONS = [ryu_ofp13.OFP_VERSION] + + def start(self): + + super(OFANeutronAgentRyuApp, self).start() + return hub.spawn(self._agent_main, self) + + def _agent_main(self, ryuapp): + cfg.CONF.register_opts(ip_lib.OPTS) + + try: + agent_config = create_agent_config_map(cfg.CONF) + except ValueError: + LOG.exception(_("Agent failed to create agent config map")) + raise SystemExit(1) + + is_xen_compute_host = ('rootwrap-xen-dom0' in + agent_config['root_helper']) + if is_xen_compute_host: + # Force ip_lib to always use the root helper to ensure that ip + # commands target xen dom0 rather than domU. + cfg.CONF.set_default('ip_lib_force_root', True) + + agent = OFANeutronAgent(ryuapp, **agent_config) + + # Start everything. + LOG.info(_("Agent initialized successfully, now running... ")) + agent.daemon_loop() + + +class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): + """A agent for OpenFlow Agent ML2 mechanism driver. + + OFANeutronAgent is a OpenFlow Agent agent for a ML2 plugin. + This is as a ryu application thread. + - An agent acts as an OpenFlow controller on each compute nodes. + - OpenFlow 1.3 (vendor agnostic unlike OVS extensions). + """ + + # history + # 1.0 Initial version + # 1.1 Support Security Group RPC + RPC_API_VERSION = '1.1' + + def __init__(self, ryuapp, integ_br, tun_br, local_ip, + bridge_mappings, root_helper, + polling_interval, tunnel_types=None, + veth_mtu=None, l2_population=False, + minimize_polling=False, + ovsdb_monitor_respawn_interval=( + constants.DEFAULT_OVSDBMON_RESPAWN)): + """Constructor. + + :param ryuapp: object of the ryu app. + :param integ_br: name of the integration bridge. + :param tun_br: name of the tunnel bridge. + :param local_ip: local IP address of this hypervisor. + :param bridge_mappings: mappings from physical network name to bridge. + :param root_helper: utility to use when running shell cmds. + :param polling_interval: interval (secs) to poll DB. + :param tunnel_types: A list of tunnel types to enable support for in + the agent. If set, will automatically set enable_tunneling to + True. + :param veth_mtu: MTU size for veth interfaces. + :param minimize_polling: Optional, whether to minimize polling by + monitoring ovsdb for interface changes. + :param ovsdb_monitor_respawn_interval: Optional, when using polling + minimization, the number of seconds to wait before respawning + the ovsdb monitor. + """ + self.ryuapp = ryuapp + self.veth_mtu = veth_mtu + self.root_helper = root_helper + self.available_local_vlans = set(xrange(n_const.MIN_VLAN_TAG, + n_const.MAX_VLAN_TAG)) + self.tunnel_types = tunnel_types or [] + self.l2_pop = l2_population + self.agent_state = { + 'binary': 'neutron-ofa-agent', + 'host': cfg.CONF.host, + 'topic': n_const.L2_AGENT_TOPIC, + 'configurations': {'bridge_mappings': bridge_mappings, + 'tunnel_types': self.tunnel_types, + 'tunneling_ip': local_ip, + 'l2_population': self.l2_pop}, + 'agent_type': n_const.AGENT_TYPE_OFA, + 'start_flag': True} + + # Keep track of int_br's device count for use by _report_state() + self.int_br_device_count = 0 + + self.int_br = OVSBridge(integ_br, self.root_helper, self.ryuapp) + self.setup_rpc() + self.setup_integration_br() + self.setup_physical_bridges(bridge_mappings) + self.local_vlan_map = {} + self.tun_br_ofports = {p_const.TYPE_GRE: {}, + p_const.TYPE_VXLAN: {}} + + self.polling_interval = polling_interval + self.minimize_polling = minimize_polling + self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval + + self.enable_tunneling = bool(self.tunnel_types) + self.local_ip = local_ip + self.tunnel_count = 0 + self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port + self._check_ovs_version() + if self.enable_tunneling: + self.setup_tunnel_br(tun_br) + # Collect additional bridges to monitor + self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br) + + # Security group agent support + self.sg_agent = OFASecurityGroupAgent(self.context, + self.plugin_rpc, + self.root_helper) + # Initialize iteration counter + self.iter_num = 0 + + def _check_ovs_version(self): + if p_const.TYPE_VXLAN in self.tunnel_types: + try: + ovs_lib.check_ovs_vxlan_version(self.root_helper) + except SystemError: + LOG.exception(_("Agent terminated")) + raise SystemExit(1) + + def _report_state(self): + # How many devices are likely used by a VM + self.agent_state.get('configurations')['devices'] = ( + self.int_br_device_count) + try: + self.state_rpc.report_state(self.context, + self.agent_state) + self.agent_state.pop('start_flag', None) + except Exception: + LOG.exception(_("Failed reporting state!")) + + def ryu_send_msg(self, msg): + result = ryu_api.send_msg(self.ryuapp, msg) + LOG.info(_("ryu send_msg() result: %s"), result) + + def setup_rpc(self): + mac = self.int_br.get_local_port_mac() + self.agent_id = '%s%s' % ('ovs', (mac.replace(":", ""))) + self.topic = topics.AGENT + self.plugin_rpc = OFAPluginApi(topics.PLUGIN) + self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) + + # RPC network init + self.context = context.get_admin_context_without_session() + # Handle updates from service + self.dispatcher = self.create_rpc_dispatcher() + # Define the listening consumers for the agent + consumers = [[topics.PORT, topics.UPDATE], + [topics.NETWORK, topics.DELETE], + [constants.TUNNEL, topics.UPDATE], + [topics.SECURITY_GROUP, topics.UPDATE]] + self.connection = agent_rpc.create_consumers(self.dispatcher, + self.topic, + consumers) + report_interval = cfg.CONF.AGENT.report_interval + if report_interval: + heartbeat = loopingcall.FixedIntervalLoopingCall( + self._report_state) + heartbeat.start(interval=report_interval) + + def get_net_uuid(self, vif_id): + for network_id, vlan_mapping in self.local_vlan_map.iteritems(): + if vif_id in vlan_mapping.vif_ports: + return network_id + + def network_delete(self, context, **kwargs): + network_id = kwargs.get('network_id') + LOG.debug(_("network_delete received network %s"), network_id) + # The network may not be defined on this agent + lvm = self.local_vlan_map.get(network_id) + if lvm: + self.reclaim_local_vlan(network_id) + else: + LOG.debug(_("Network %s not used on agent."), network_id) + + def port_update(self, context, **kwargs): + port = kwargs.get('port') + LOG.debug(_("port_update received port %s"), port['id']) + # Validate that port is on OVS + vif_port = self.int_br.get_vif_port_by_id(port['id']) + if not vif_port: + return + + if ext_sg.SECURITYGROUPS in port: + self.sg_agent.refresh_firewall() + network_type = kwargs.get('network_type') + segmentation_id = kwargs.get('segmentation_id') + physical_network = kwargs.get('physical_network') + self.treat_vif_port(vif_port, port['id'], port['network_id'], + network_type, physical_network, + segmentation_id, port['admin_state_up']) + try: + if port['admin_state_up']: + # update plugin about port status + self.plugin_rpc.update_device_up(self.context, port['id'], + self.agent_id, + cfg.CONF.host) + else: + # update plugin about port status + self.plugin_rpc.update_device_down(self.context, port['id'], + self.agent_id, + cfg.CONF.host) + except rpc_common.Timeout: + LOG.error(_("RPC timeout while updating port %s"), port['id']) + + def tunnel_update(self, context, **kwargs): + LOG.debug(_("tunnel_update received")) + if not self.enable_tunneling: + return + tunnel_ip = kwargs.get('tunnel_ip') + tunnel_id = kwargs.get('tunnel_id', tunnel_ip) + tunnel_type = kwargs.get('tunnel_type') + if not tunnel_type: + LOG.error(_("No tunnel_type specified, cannot create tunnels")) + return + if tunnel_type not in self.tunnel_types: + LOG.error(_("tunnel_type %s not supported by agent"), tunnel_type) + return + if tunnel_ip == self.local_ip: + return + tun_name = '%s-%s' % (tunnel_type, tunnel_id) + self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type) + + def create_rpc_dispatcher(self): + """Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + """ + return dispatcher.RpcDispatcher([self]) + + def _provision_local_vlan_outbound_for_tunnel(self, lvid, + segmentation_id, ofports): + br = self.tun_br + match = br.ofparser.OFPMatch( + vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT) + actions = [br.ofparser.OFPActionPopVlan(), + br.ofparser.OFPActionSetField( + tunnel_id=int(segmentation_id))] + for ofport in ofports: + actions.append(br.ofparser.OFPActionOutput(ofport, 0)) + instructions = [br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)] + msg = br.ofparser.OFPFlowMod( + br.datapath, + table_id=constants.FLOOD_TO_TUN, + priority=1, + match=match, instructions=instructions) + self.ryu_send_msg(msg) + + def _provision_local_vlan_inbound_for_tunnel(self, lvid, network_type, + segmentation_id): + br = self.tun_br + match = br.ofparser.OFPMatch( + tunnel_id=int(segmentation_id)) + actions = [br.ofparser.OFPActionSetField( + vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT)] + instructions = [ + br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, actions), + br.ofparser.OFPInstructionGotoTable( + table_id=constants.LEARN_FROM_TUN)] + msg = br.ofparser.OFPFlowMod( + br.datapath, + table_id=constants.TUN_TABLE[network_type], + priority=1, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + + def _local_vlan_for_tunnel(self, lvid, network_type, segmentation_id): + ofports = [int(ofport) for ofport in + self.tun_br_ofports[network_type].values()] + if ofports: + self._provision_local_vlan_outbound_for_tunnel( + lvid, segmentation_id, ofports) + self._provision_local_vlan_inbound_for_tunnel(lvid, network_type, + segmentation_id) + + def _provision_local_vlan_outbound(self, br, lvid, actions, + physical_network): + match = br.ofparser.OFPMatch( + in_port=int(self.phys_ofports[physical_network]), + vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT) + instructions = [br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)] + msg = br.ofparser.OFPFlowMod(br.datapath, + priority=4, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + + def _provision_local_vlan_inbound(self, lvid, vlan_vid, physical_network): + match = self.int_br.ofparser.OFPMatch( + in_port=int(self.int_ofports[physical_network]), + vlan_vid=vlan_vid) + actions = [self.int_br.ofparser.OFPActionSetField( + vlan_vid=int(lvid) | ryu_ofp13.OFPVID_PRESENT), + self.int_br.ofparser.OFPActionOutput( + ryu_ofp13.OFPP_NORMAL, 0)] + instructions = [self.int_br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, actions)] + msg = self.int_br.ofparser.OFPFlowMod( + self.int_br.datapath, + priority=3, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + + def _local_vlan_for_flat(self, lvid, physical_network): + br = self.phys_brs[physical_network] + actions = [br.ofparser.OFPActionPopVlan(), + br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)] + self._provision_local_vlan_outbound( + br, lvid, actions, physical_network) + self._provision_local_vlan_inbound(lvid, 0xffff, physical_network) + + def _local_vlan_for_vlan(self, lvid, physical_network, segmentation_id): + br = self.phys_brs[physical_network] + actions = [br.ofparser.OFPActionSetField( + vlan_vid=int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT), + br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)] + self._provision_local_vlan_outbound( + br, lvid, actions, physical_network) + self._provision_local_vlan_inbound( + lvid, int(segmentation_id) | ryu_ofp13.OFPVID_PRESENT, + physical_network) + + def provision_local_vlan(self, net_uuid, network_type, physical_network, + segmentation_id): + """Provisions a local VLAN. + + :param net_uuid: the uuid of the network associated with this vlan. + :param network_type: the network type ('gre', 'vxlan', 'vlan', 'flat', + 'local') + :param physical_network: the physical network for 'vlan' or 'flat' + :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel' + """ + + if not self.available_local_vlans: + LOG.error(_("No local VLAN available for net-id=%s"), net_uuid) + return + lvid = self.available_local_vlans.pop() + LOG.info(_("Assigning %(vlan_id)s as local vlan for " + "net-id=%(net_uuid)s"), + {'vlan_id': lvid, 'net_uuid': net_uuid}) + self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type, + physical_network, + segmentation_id) + + if network_type in constants.TUNNEL_NETWORK_TYPES: + if self.enable_tunneling: + self._local_vlan_for_tunnel(lvid, network_type, + segmentation_id) + else: + LOG.error(_("Cannot provision %(network_type)s network for " + "net-id=%(net_uuid)s - tunneling disabled"), + {'network_type': network_type, + 'net_uuid': net_uuid}) + elif network_type == p_const.TYPE_FLAT: + if physical_network in self.phys_brs: + self._local_vlan_for_flat(lvid, physical_network) + else: + LOG.error(_("Cannot provision flat network for " + "net-id=%(net_uuid)s - no bridge for " + "physical_network %(physical_network)s"), + {'net_uuid': net_uuid, + 'physical_network': physical_network}) + elif network_type == p_const.TYPE_VLAN: + if physical_network in self.phys_brs: + self._local_vlan_for_vlan(lvid, physical_network, + segmentation_id) + else: + LOG.error(_("Cannot provision VLAN network for " + "net-id=%(net_uuid)s - no bridge for " + "physical_network %(physical_network)s"), + {'net_uuid': net_uuid, + 'physical_network': physical_network}) + elif network_type == p_const.TYPE_LOCAL: + # no flows needed for local networks + pass + else: + LOG.error(_("Cannot provision unknown network type " + "%(network_type)s for net-id=%(net_uuid)s"), + {'network_type': network_type, + 'net_uuid': net_uuid}) + + def _reclaim_local_vlan_outbound(self, lvm): + br = self.phys_brs[lvm.physical_network] + match = br.ofparser.OFPMatch( + in_port=self.phys_ofports[lvm.physical_network], + vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT) + msg = br.ofparser.OFPFlowMod(br.datapath, + table_id=ryu_ofp13.OFPTT_ALL, + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY, + match=match) + self.ryu_send_msg(msg) + + def _reclaim_local_vlan_inbound(self, lvm, vlan_vid): + br = self.int_br + match = br.ofparser.OFPMatch( + in_port=self.int_ofports[lvm.physical_network], + vlan_vid=vlan_vid) + msg = br.ofparser.OFPFlowMod(br.datapath, + table_id=ryu_ofp13.OFPTT_ALL, + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY, + match=match) + self.ryu_send_msg(msg) + + def reclaim_local_vlan(self, net_uuid): + """Reclaim a local VLAN. + + :param net_uuid: the network uuid associated with this vlan. + :param lvm: a LocalVLANMapping object that tracks (vlan, lsw_id, + vif_ids) mapping. + """ + lvm = self.local_vlan_map.pop(net_uuid, None) + if lvm is None: + LOG.debug(_("Network %s not used on agent."), net_uuid) + return + + LOG.info(_("Reclaiming vlan = %(vlan_id)s from net-id = %(net_uuid)s"), + {'vlan_id': lvm.vlan, + 'net_uuid': net_uuid}) + + if lvm.network_type in constants.TUNNEL_NETWORK_TYPES: + if self.enable_tunneling: + match = self.tun_br.ofparser.OFPMatch( + tunnel_id=int(lvm.segmentation_id)) + msg = self.tun_br.ofparser.OFPFlowMod( + self.tun_br.datapath, + table_id=constants.TUN_TABLE[lvm.network_type], + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY, + match=match) + self.ryu_send_msg(msg) + match = self.tun_br.ofparser.OFPMatch( + vlan_vid=int(lvm.vlan) | ryu_ofp13.OFPVID_PRESENT) + msg = self.tun_br.ofparser.OFPFlowMod( + self.tun_br.datapath, + table_id=ryu_ofp13.OFPTT_ALL, + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY, + match=match) + self.ryu_send_msg(msg) + elif lvm.network_type == p_const.TYPE_FLAT: + if lvm.physical_network in self.phys_brs: + self._reclaim_local_vlan_outbound(lvm) + self._reclaim_local_vlan_inbound(lvm, 0xffff) + elif lvm.network_type == p_const.TYPE_VLAN: + if lvm.physical_network in self.phys_brs: + self._reclaim_local_vlan_outbound(lvm) + self._reclaim_local_vlan_inbound( + lvm, lvm.segmentation_id | ryu_ofp13.OFPVID_PRESENT) + elif lvm.network_type == p_const.TYPE_LOCAL: + # no flows needed for local networks + pass + else: + LOG.error(_("Cannot reclaim unknown network type " + "%(network_type)s for net-id=%(net_uuid)s"), + {'network_type': lvm.network_type, + 'net_uuid': net_uuid}) + + self.available_local_vlans.add(lvm.vlan) + + def port_bound(self, port, net_uuid, + network_type, physical_network, segmentation_id): + """Bind port to net_uuid/lsw_id and install flow for inbound traffic + to vm. + + :param port: a ovs_lib.VifPort object. + :param net_uuid: the net_uuid this port is to be associated with. + :param network_type: the network type ('gre', 'vlan', 'flat', 'local') + :param physical_network: the physical network for 'vlan' or 'flat' + :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel' + """ + if net_uuid not in self.local_vlan_map: + self.provision_local_vlan(net_uuid, network_type, + physical_network, segmentation_id) + lvm = self.local_vlan_map[net_uuid] + lvm.vif_ports[port.vif_id] = port + + self.int_br.set_db_attribute("Port", port.port_name, "tag", + str(lvm.vlan)) + if int(port.ofport) != -1: + match = self.int_br.ofparser.OFPMatch(in_port=port.ofport) + msg = self.int_br.ofparser.OFPFlowMod( + self.int_br.datapath, + table_id=ryu_ofp13.OFPTT_ALL, + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY, + match=match) + self.ryu_send_msg(msg) + + def port_unbound(self, vif_id, net_uuid=None): + """Unbind port. + + Removes corresponding local vlan mapping object if this is its last + VIF. + + :param vif_id: the id of the vif + :param net_uuid: the net_uuid this port is associated with. + """ + net_uuid = net_uuid or self.get_net_uuid(vif_id) + + if not self.local_vlan_map.get(net_uuid): + LOG.info(_('port_unbound() net_uuid %s not in local_vlan_map'), + net_uuid) + return + + lvm = self.local_vlan_map[net_uuid] + lvm.vif_ports.pop(vif_id, None) + + if not lvm.vif_ports: + self.reclaim_local_vlan(net_uuid) + + def port_dead(self, port): + """Once a port has no binding, put it on the "dead vlan". + + :param port: a ovs_lib.VifPort object. + """ + self.int_br.set_db_attribute("Port", port.port_name, "tag", + DEAD_VLAN_TAG) + match = self.int_br.ofparser.OFPMatch(in_port=int(port.ofport)) + msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath, + priority=2, match=match) + self.ryu_send_msg(msg) + + def setup_integration_br(self): + """Setup the integration bridge. + + Create patch ports and remove all existing flows. + + :param bridge_name: the name of the integration bridge. + :returns: the integration bridge + """ + self.int_br.setup_ofp() + self.int_br.delete_port(cfg.CONF.OVS.int_peer_patch_port) + msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath, + table_id=ryu_ofp13.OFPTT_ALL, + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY) + self.ryu_send_msg(msg) + # switch all traffic using L2 learning + actions = [self.int_br.ofparser.OFPActionOutput( + ryu_ofp13.OFPP_NORMAL, 0)] + instructions = [self.int_br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, + actions)] + msg = self.int_br.ofparser.OFPFlowMod(self.int_br.datapath, + priority=1, + instructions=instructions) + self.ryu_send_msg(msg) + + def setup_ancillary_bridges(self, integ_br, tun_br): + """Setup ancillary bridges - for example br-ex.""" + ovs_bridges = set(ovs_lib.get_bridges(self.root_helper)) + # Remove all known bridges + ovs_bridges.remove(integ_br) + if self.enable_tunneling: + ovs_bridges.remove(tun_br) + br_names = [self.phys_brs[physical_network].br_name for + physical_network in self.phys_brs] + ovs_bridges.difference_update(br_names) + # Filter list of bridges to those that have external + # bridge-id's configured + br_names = [ + bridge for bridge in ovs_bridges + if bridge != ovs_lib.get_bridge_external_bridge_id( + self.root_helper, bridge) + ] + ovs_bridges.difference_update(br_names) + ancillary_bridges = [] + for bridge in ovs_bridges: + br = OVSBridge(bridge, self.root_helper, self.ryuapp) + ancillary_bridges.append(br) + LOG.info(_('ancillary bridge list: %s.'), ancillary_bridges) + return ancillary_bridges + + def _tun_br_sort_incoming_traffic_depend_in_port(self, br): + match = br.ofparser.OFPMatch( + in_port=int(self.patch_int_ofport)) + instructions = [br.ofparser.OFPInstructionGotoTable( + table_id=constants.PATCH_LV_TO_TUN)] + msg = br.ofparser.OFPFlowMod(br.datapath, + priority=1, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + msg = br.ofparser.OFPFlowMod(br.datapath, priority=0) + self.ryu_send_msg(msg) + + def _tun_br_goto_table_ucast_unicast(self, br): + match = br.ofparser.OFPMatch(eth_dst=('00:00:00:00:00:00', + '01:00:00:00:00:00')) + instructions = [br.ofparser.OFPInstructionGotoTable( + table_id=constants.UCAST_TO_TUN)] + msg = br.ofparser.OFPFlowMod(br.datapath, + table_id=constants.PATCH_LV_TO_TUN, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + + def _tun_br_goto_table_flood_broad_multi_cast(self, br): + match = br.ofparser.OFPMatch(eth_dst=('01:00:00:00:00:00', + '01:00:00:00:00:00')) + instructions = [br.ofparser.OFPInstructionGotoTable( + table_id=constants.FLOOD_TO_TUN)] + msg = br.ofparser.OFPFlowMod(br.datapath, + table_id=constants.PATCH_LV_TO_TUN, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + + def _tun_br_set_table_tun_by_tunnel_type(self, br): + for tunnel_type in constants.TUNNEL_NETWORK_TYPES: + msg = br.ofparser.OFPFlowMod( + br.datapath, + table_id=constants.TUN_TABLE[tunnel_type], + priority=0) + self.ryu_send_msg(msg) + + def _tun_br_output_patch_int(self, br): + actions = [br.ofparser.OFPActionOutput( + int(self.patch_int_ofport), 0)] + instructions = [br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, + actions)] + msg = br.ofparser.OFPFlowMod(br.datapath, + table_id=constants.LEARN_FROM_TUN, + priority=1, + instructions=instructions) + self.ryu_send_msg(msg) + + def _tun_br_goto_table_flood_unknown_unicast(self, br): + instructions = [br.ofparser.OFPInstructionGotoTable( + table_id=constants.FLOOD_TO_TUN)] + msg = br.ofparser.OFPFlowMod(br.datapath, + table_id=constants.UCAST_TO_TUN, + priority=0, + instructions=instructions) + self.ryu_send_msg(msg) + + def _tun_br_default_drop(self, br): + msg = br.ofparser.OFPFlowMod( + br.datapath, + table_id=constants.FLOOD_TO_TUN, + priority=0) + self.ryu_send_msg(msg) + + def setup_tunnel_br(self, tun_br): + """Setup the tunnel bridge. + + Creates tunnel bridge, and links it to the integration bridge + using a patch port. + + :param tun_br: the name of the tunnel bridge. + """ + self.tun_br = OVSBridge(tun_br, self.root_helper, self.ryuapp) + self.tun_br.reset_bridge() + self.tun_br.setup_ofp() + self.patch_tun_ofport = self.int_br.add_patch_port( + cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port) + self.patch_int_ofport = self.tun_br.add_patch_port( + cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port) + if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0: + LOG.error(_("Failed to create OVS patch port. Cannot have " + "tunneling enabled on this agent, since this version " + "of OVS does not support tunnels or patch ports. " + "Agent terminated!")) + raise SystemExit(1) + msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath, + table_id=ryu_ofp13.OFPTT_ALL, + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY) + self.ryu_send_msg(msg) + + self._tun_br_sort_incoming_traffic_depend_in_port(self.tun_br) + self._tun_br_goto_table_ucast_unicast(self.tun_br) + self._tun_br_goto_table_flood_broad_multi_cast(self.tun_br) + self._tun_br_set_table_tun_by_tunnel_type(self.tun_br) + self._tun_br_output_patch_int(self.tun_br) + self._tun_br_goto_table_flood_unknown_unicast(self.tun_br) + self._tun_br_default_drop(self.tun_br) + + def _phys_br_prepare_create_veth(self, br, int_veth_name, phys_veth_name): + self.int_br.delete_port(int_veth_name) + br.delete_port(phys_veth_name) + if ip_lib.device_exists(int_veth_name, self.root_helper): + ip_lib.IPDevice(int_veth_name, self.root_helper).link.delete() + # Give udev a chance to process its rules here, to avoid + # race conditions between commands launched by udev rules + # and the subsequent call to ip_wrapper.add_veth + utils.execute(['/sbin/udevadm', 'settle', '--timeout=10']) + + def _phys_br_create_veth(self, br, int_veth_name, + phys_veth_name, physical_network, ip_wrapper): + int_veth, phys_veth = ip_wrapper.add_veth(int_veth_name, + phys_veth_name) + self.int_ofports[physical_network] = self.int_br.add_port(int_veth) + self.phys_ofports[physical_network] = br.add_port(phys_veth) + return (int_veth, phys_veth) + + def _phys_br_block_untranslated_traffic(self, br, physical_network): + match = br.ofparser.OFPMatch(in_port=int( + self.int_ofports[physical_network])) + msg = br.ofparser.OFPFlowMod(self.int_br.datapath, + priority=2, match=match) + self.ryu_send_msg(msg) + match = br.ofparser.OFPMatch(in_port=int( + self.phys_ofports[physical_network])) + msg = br.ofparser.OFPFlowMod(br.datapath, priority=2, match=match) + self.ryu_send_msg(msg) + + def _phys_br_enable_veth_to_pass_traffic(self, int_veth, phys_veth): + # enable veth to pass traffic + int_veth.link.set_up() + phys_veth.link.set_up() + + if self.veth_mtu: + # set up mtu size for veth interfaces + int_veth.link.set_mtu(self.veth_mtu) + phys_veth.link.set_mtu(self.veth_mtu) + + def _phys_br_patch_physical_bridge_with_integration_bridge( + self, br, physical_network, bridge, ip_wrapper): + int_veth_name = constants.VETH_INTEGRATION_PREFIX + bridge + phys_veth_name = constants.VETH_PHYSICAL_PREFIX + bridge + self._phys_br_prepare_create_veth(br, int_veth_name, phys_veth_name) + int_veth, phys_veth = self._phys_br_create_veth(br, int_veth_name, + phys_veth_name, + physical_network, + ip_wrapper) + self._phys_br_block_untranslated_traffic(br, physical_network) + self._phys_br_enable_veth_to_pass_traffic(int_veth, phys_veth) + + def setup_physical_bridges(self, bridge_mappings): + """Setup the physical network bridges. + + Creates physical network bridges and links them to the + integration bridge using veths. + + :param bridge_mappings: map physical network names to bridge names. + """ + self.phys_brs = {} + self.int_ofports = {} + self.phys_ofports = {} + ip_wrapper = ip_lib.IPWrapper(self.root_helper) + for physical_network, bridge in bridge_mappings.iteritems(): + LOG.info(_("Mapping physical network %(physical_network)s to " + "bridge %(bridge)s"), + {'physical_network': physical_network, + 'bridge': bridge}) + # setup physical bridge + if not ip_lib.device_exists(bridge, self.root_helper): + LOG.error(_("Bridge %(bridge)s for physical network " + "%(physical_network)s does not exist. Agent " + "terminated!"), + {'physical_network': physical_network, + 'bridge': bridge}) + raise SystemExit(1) + br = OVSBridge(bridge, self.root_helper, self.ryuapp) + br.setup_ofp() + msg = br.ofparser.OFPFlowMod(br.datapath, + table_id=ryu_ofp13.OFPTT_ALL, + command=ryu_ofp13.OFPFC_DELETE, + out_group=ryu_ofp13.OFPG_ANY, + out_port=ryu_ofp13.OFPP_ANY) + self.ryu_send_msg(msg) + actions = [br.ofparser.OFPActionOutput(ryu_ofp13.OFPP_NORMAL, 0)] + instructions = [br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, + actions)] + msg = br.ofparser.OFPFlowMod(br.datapath, + priority=1, + instructions=instructions) + self.ryu_send_msg(msg) + self.phys_brs[physical_network] = br + + self._phys_br_patch_physical_bridge_with_integration_bridge( + br, physical_network, bridge, ip_wrapper) + + def update_ports(self, registered_ports): + ports = self.int_br.get_vif_port_set() + if ports == registered_ports: + return + self.int_br_device_count = len(ports) + added = ports - registered_ports + removed = registered_ports - ports + return {'current': ports, + 'added': added, + 'removed': removed} + + def update_ancillary_ports(self, registered_ports): + ports = set() + for bridge in self.ancillary_brs: + ports |= bridge.get_vif_port_set() + + if ports == registered_ports: + return + added = ports - registered_ports + removed = registered_ports - ports + return {'current': ports, + 'added': added, + 'removed': removed} + + def treat_vif_port(self, vif_port, port_id, network_id, network_type, + physical_network, segmentation_id, admin_state_up): + if vif_port: + if admin_state_up: + self.port_bound(vif_port, network_id, network_type, + physical_network, segmentation_id) + else: + self.port_dead(vif_port) + else: + LOG.debug(_("No VIF port for port %s defined on agent."), port_id) + + def setup_tunnel_port(self, port_name, remote_ip, tunnel_type): + ofport = self.tun_br.add_tunnel_port(port_name, + remote_ip, + self.local_ip, + tunnel_type, + self.vxlan_udp_port) + ofport_int = -1 + try: + ofport_int = int(ofport) + except (TypeError, ValueError): + LOG.exception(_("ofport should have a value that can be " + "interpreted as an integer")) + if ofport_int < 0: + LOG.error(_("Failed to set-up %(type)s tunnel port to %(ip)s"), + {'type': tunnel_type, 'ip': remote_ip}) + return 0 + + self.tun_br_ofports[tunnel_type][remote_ip] = ofport + # Add flow in default table to resubmit to the right + # tunelling table (lvid will be set in the latter) + match = self.tun_br.ofparser.OFPMatch(in_port=int(ofport)) + instructions = [self.tun_br.ofparser.OFPInstructionGotoTable( + table_id=constants.TUN_TABLE[tunnel_type])] + msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath, + priority=1, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + + ofports = [int(p) for p in self.tun_br_ofports[tunnel_type].values()] + if ofports: + # Update flooding flows to include the new tunnel + for network_id, vlan_mapping in self.local_vlan_map.iteritems(): + if vlan_mapping.network_type == tunnel_type: + match = self.tun_br.ofparser.OFPMatch( + vlan_vid=int(vlan_mapping.vlan) | + ryu_ofp13.OFPVID_PRESENT) + actions = [ + self.tun_br.ofparser.OFPActionPopVlan(), + self.tun_br.ofparser.OFPActionSetField( + tunnel_id=int(vlan_mapping.segmentation_id))] + actions.extend( + self.tun_br.ofparser.OFPActionOutput(p, 0) + for p in ofports + ) + instructions = [ + self.tun_br.ofparser.OFPInstructionActions( + ryu_ofp13.OFPIT_APPLY_ACTIONS, + actions)] + msg = self.tun_br.ofparser.OFPFlowMod( + self.tun_br.datapath, + table_id=constants.FLOOD_TO_TUN, + priority=1, + match=match, + instructions=instructions) + self.ryu_send_msg(msg) + return ofport + + def cleanup_tunnel_port(self, tun_ofport, tunnel_type): + # Check if this tunnel port is still used + for lvm in self.local_vlan_map.values(): + if tun_ofport in lvm.tun_ofports: + break + # If not, remove it + else: + for remote_ip, ofport in self.tun_br_ofports[tunnel_type].items(): + if ofport == tun_ofport: + port_name = '%s-%s' % (tunnel_type, remote_ip) + self.tun_br.delete_port(port_name) + self.tun_br_ofports[tunnel_type].pop(remote_ip, None) + + def treat_devices_added(self, devices): + resync = False + self.sg_agent.prepare_devices_filter(devices) + for device in devices: + LOG.info(_("Port %s added"), device) + try: + details = self.plugin_rpc.get_device_details(self.context, + device, + self.agent_id) + except Exception as e: + LOG.debug(_("Unable to get port details for " + "%(device)s: %(e)s"), + {'device': device, 'e': e}) + resync = True + continue + port = self.int_br.get_vif_port_by_id(details['device']) + if 'port_id' in details: + LOG.info(_("Port %(device)s updated. Details: %(details)s"), + {'device': device, 'details': details}) + self.treat_vif_port(port, details['port_id'], + details['network_id'], + details['network_type'], + details['physical_network'], + details['segmentation_id'], + details['admin_state_up']) + + # update plugin about port status + self.plugin_rpc.update_device_up(self.context, + device, + self.agent_id, + cfg.CONF.host) + else: + LOG.debug(_("Device %s not defined on plugin"), device) + if (port and int(port.ofport) != -1): + self.port_dead(port) + return resync + + def treat_ancillary_devices_added(self, devices): + resync = False + for device in devices: + LOG.info(_("Ancillary Port %s added"), device) + try: + self.plugin_rpc.get_device_details(self.context, device, + self.agent_id) + except Exception as e: + LOG.debug(_("Unable to get port details for " + "%(device)s: %(e)s"), + {'device': device, 'e': e}) + resync = True + continue + + # update plugin about port status + self.plugin_rpc.update_device_up(self.context, + device, + self.agent_id, + cfg.CONF.host) + return resync + + def treat_devices_removed(self, devices): + resync = False + self.sg_agent.remove_devices_filter(devices) + for device in devices: + LOG.info(_("Attachment %s removed"), device) + try: + self.plugin_rpc.update_device_down(self.context, + device, + self.agent_id, + cfg.CONF.host) + except Exception as e: + LOG.debug(_("port_removed failed for %(device)s: %(e)s"), + {'device': device, 'e': e}) + resync = True + continue + self.port_unbound(device) + return resync + + def treat_ancillary_devices_removed(self, devices): + resync = False + for device in devices: + LOG.info(_("Attachment %s removed"), device) + try: + details = self.plugin_rpc.update_device_down(self.context, + device, + self.agent_id, + cfg.CONF.host) + except Exception as e: + LOG.debug(_("port_removed failed for %(device)s: %(e)s"), + {'device': device, 'e': e}) + resync = True + continue + if details['exists']: + LOG.info(_("Port %s updated."), device) + # Nothing to do regarding local networking + else: + LOG.debug(_("Device %s not defined on plugin"), device) + return resync + + def process_network_ports(self, port_info): + resync_add = False + resync_removed = False + if 'added' in port_info: + start = time.time() + resync_add = self.treat_devices_added(port_info['added']) + LOG.debug(_("process_network_ports - iteration:%(iter_num)d - " + "treat_devices_added completed in %(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + if 'removed' in port_info: + start = time.time() + resync_removed = self.treat_devices_removed(port_info['removed']) + LOG.debug(_("process_network_ports - iteration:%(iter_num)d - " + "treat_devices_removed completed in %(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + # If one of the above opertaions fails => resync with plugin + return (resync_add | resync_removed) + + def process_ancillary_network_ports(self, port_info): + resync_add = False + resync_removed = False + if 'added' in port_info: + start = time.time() + resync_add = self.treat_ancillary_devices_added(port_info['added']) + LOG.debug(_("process_ancillary_network_ports - iteration: " + "%(iter_num)d - treat_ancillary_devices_added " + "completed in %(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + if 'removed' in port_info: + start = time.time() + resync_removed = self.treat_ancillary_devices_removed( + port_info['removed']) + LOG.debug(_("process_ancillary_network_ports - iteration: " + "%(iter_num)d - treat_ancillary_devices_removed " + "completed in %(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + + # If one of the above opertaions fails => resync with plugin + return (resync_add | resync_removed) + + def tunnel_sync(self): + resync = False + try: + for tunnel_type in self.tunnel_types: + details = self.plugin_rpc.tunnel_sync(self.context, + self.local_ip, + tunnel_type) + tunnels = details['tunnels'] + for tunnel in tunnels: + if self.local_ip != tunnel['ip_address']: + tunnel_id = tunnel.get('id', tunnel['ip_address']) + tun_name = '%s-%s' % (tunnel_type, tunnel_id) + self.setup_tunnel_port(tun_name, + tunnel['ip_address'], + tunnel_type) + except Exception as e: + LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"), + {'local_ip': self.local_ip, 'e': e}) + resync = True + return resync + + def ovsdb_monitor_loop(self, polling_manager=None): + if not polling_manager: + polling_manager = polling.AlwaysPoll() + + sync = True + ports = set() + ancillary_ports = set() + tunnel_sync = True + while True: + try: + start = time.time() + port_stats = {'regular': {'added': 0, 'removed': 0}, + 'ancillary': {'added': 0, 'removed': 0}} + LOG.debug(_("Agent ovsdb_monitor_loop - " + "iteration:%d started"), + self.iter_num) + if sync: + LOG.info(_("Agent out of sync with plugin!")) + ports.clear() + ancillary_ports.clear() + sync = False + polling_manager.force_polling() + + # Notify the plugin of tunnel IP + if self.enable_tunneling and tunnel_sync: + LOG.info(_("Agent tunnel out of sync with plugin!")) + tunnel_sync = self.tunnel_sync() + if polling_manager.is_polling_required: + LOG.debug(_("Agent ovsdb_monitor_loop - " + "iteration:%(iter_num)d - " + "starting polling. Elapsed:%(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + port_info = self.update_ports(ports) + LOG.debug(_("Agent ovsdb_monitor_loop - " + "iteration:%(iter_num)d - " + "port information retrieved. " + "Elapsed:%(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + # notify plugin about port deltas + if port_info: + LOG.debug(_("Agent loop has new devices!")) + # If treat devices fails - must resync with plugin + sync = self.process_network_ports(port_info) + LOG.debug(_("Agent ovsdb_monitor_loop - " + "iteration:%(iter_num)d - " + "ports processed. Elapsed:%(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + ports = port_info['current'] + port_stats['regular']['added'] = ( + len(port_info.get('added', []))) + port_stats['regular']['removed'] = ( + len(port_info.get('removed', []))) + # Treat ancillary devices if they exist + if self.ancillary_brs: + port_info = self.update_ancillary_ports( + ancillary_ports) + LOG.debug(_("Agent ovsdb_monitor_loop - " + "iteration:%(iter_num)d - " + "ancillary port info retrieved. " + "Elapsed:%(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + + if port_info: + rc = self.process_ancillary_network_ports( + port_info) + LOG.debug(_("Agent ovsdb_monitor_loop - " + "iteration:" + "%(iter_num)d - ancillary ports " + "processed. Elapsed:%(elapsed).3f"), + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + ancillary_ports = port_info['current'] + port_stats['ancillary']['added'] = ( + len(port_info.get('added', []))) + port_stats['ancillary']['removed'] = ( + len(port_info.get('removed', []))) + sync = sync | rc + + polling_manager.polling_completed() + + except Exception: + LOG.exception(_("Error in agent event loop")) + sync = True + tunnel_sync = True + + # sleep till end of polling interval + elapsed = (time.time() - start) + LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d " + "completed. Processed ports statistics:" + "%(port_stats)s. Elapsed:%(elapsed).3f"), + {'iter_num': self.iter_num, + 'port_stats': port_stats, + 'elapsed': elapsed}) + if (elapsed < self.polling_interval): + time.sleep(self.polling_interval - elapsed) + else: + LOG.debug(_("Loop iteration exceeded interval " + "(%(polling_interval)s vs. %(elapsed)s)!"), + {'polling_interval': self.polling_interval, + 'elapsed': elapsed}) + self.iter_num = self.iter_num + 1 + + def daemon_loop(self): + with polling.get_polling_manager( + self.minimize_polling, + self.root_helper, + self.ovsdb_monitor_respawn_interval) as pm: + + self.ovsdb_monitor_loop(polling_manager=pm) + + +def create_agent_config_map(config): + """Create a map of agent config parameters. + + :param config: an instance of cfg.CONF + :returns: a map of agent configuration parameters + """ + try: + bridge_mappings = n_utils.parse_mappings(config.OVS.bridge_mappings) + except ValueError as e: + raise ValueError(_("Parsing bridge_mappings failed: %s.") % e) + + kwargs = dict( + integ_br=config.OVS.integration_bridge, + tun_br=config.OVS.tunnel_bridge, + local_ip=config.OVS.local_ip, + bridge_mappings=bridge_mappings, + root_helper=config.AGENT.root_helper, + polling_interval=config.AGENT.polling_interval, + minimize_polling=config.AGENT.minimize_polling, + tunnel_types=config.AGENT.tunnel_types, + veth_mtu=config.AGENT.veth_mtu, + l2_population=False, + ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN, + ) + + # If enable_tunneling is TRUE, set tunnel_type to default to GRE + if config.OVS.enable_tunneling and not kwargs['tunnel_types']: + kwargs['tunnel_types'] = [p_const.TYPE_GRE] + + # Verify the tunnel_types specified are valid + for tun in kwargs['tunnel_types']: + if tun not in constants.TUNNEL_NETWORK_TYPES: + msg = _('Invalid tunnel type specificed: %s'), tun + raise ValueError(msg) + if not kwargs['local_ip']: + msg = _('Tunneling cannot be enabled without a valid local_ip.') + raise ValueError(msg) + + return kwargs diff --git a/neutron/plugins/ofagent/common/__init__.py b/neutron/plugins/ofagent/common/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/plugins/ofagent/common/config.py b/neutron/plugins/ofagent/common/config.py new file mode 100644 index 000000000..759d3df1d --- /dev/null +++ b/neutron/plugins/ofagent/common/config.py @@ -0,0 +1,33 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. + +from oslo.config import cfg + +from neutron.agent.common import config +from neutron.plugins.openvswitch.common import config as ovs_config + + +agent_opts = [ + cfg.IntOpt('get_datapath_retry_times', default=60, + help=_("Number of seconds to retry acquiring " + "an Open vSwitch datapath")), +] + + +cfg.CONF.register_opts(ovs_config.ovs_opts, 'OVS') +cfg.CONF.register_opts(ovs_config.agent_opts, 'AGENT') +cfg.CONF.register_opts(agent_opts, 'AGENT') +config.register_agent_state_opts_helper(cfg.CONF) +config.register_root_helper(cfg.CONF) diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 8c6c10186..ead538c96 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import distutils.version as dist_version import signal import sys import time @@ -227,8 +226,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def _check_ovs_version(self): if p_const.TYPE_VXLAN in self.tunnel_types: - check_ovs_version(constants.MINIMUM_OVS_VXLAN_VERSION, - self.root_helper) + try: + ovs_lib.check_ovs_vxlan_version(self.root_helper) + except SystemError: + LOG.exception(_("Agent terminated")) + raise SystemExit(1) def _report_state(self): # How many devices are likely used by a VM @@ -1250,44 +1252,6 @@ def handle_sigterm(signum, frame): sys.exit(1) -def check_ovs_version(min_required_version, root_helper): - LOG.debug(_("Checking OVS version for VXLAN support")) - installed_klm_version = ovs_lib.get_installed_ovs_klm_version() - installed_usr_version = ovs_lib.get_installed_ovs_usr_version(root_helper) - # First check the userspace version - if installed_usr_version: - if dist_version.StrictVersion( - installed_usr_version) < dist_version.StrictVersion( - min_required_version): - LOG.error(_('Failed userspace version check for Open ' - 'vSwitch with VXLAN support. To use ' - 'VXLAN tunnels with OVS, please ensure ' - 'the OVS version is %s ' - 'or newer!'), min_required_version) - sys.exit(1) - # Now check the kernel version - if installed_klm_version: - if dist_version.StrictVersion( - installed_klm_version) < dist_version.StrictVersion( - min_required_version): - LOG.error(_('Failed kernel version check for Open ' - 'vSwitch with VXLAN support. To use ' - 'VXLAN tunnels with OVS, please ensure ' - 'the OVS version is %s or newer!'), - min_required_version) - raise SystemExit(1) - else: - LOG.warning(_('Cannot determine kernel Open vSwitch version, ' - 'please ensure your Open vSwitch kernel module ' - 'is at least version %s to support VXLAN ' - 'tunnels.'), min_required_version) - else: - LOG.warning(_('Unable to determine Open vSwitch version. Please ' - 'ensure that its version is %s or newer to use VXLAN ' - 'tunnels with OVS.'), min_required_version) - raise SystemExit(1) - - def create_agent_config_map(config): """Create a map of agent config parameters. diff --git a/neutron/tests/unit/ml2/drivers/test_ofagent_mech.py b/neutron/tests/unit/ml2/drivers/test_ofagent_mech.py new file mode 100644 index 000000000..63daf9ec0 --- /dev/null +++ b/neutron/tests/unit/ml2/drivers/test_ofagent_mech.py @@ -0,0 +1,74 @@ +# Copyright (c) 2014 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import constants +from neutron.extensions import portbindings +from neutron.plugins.ml2.drivers import mech_ofagent +from neutron.tests.unit.ml2 import _test_mech_agent as base + + +class OfagentMechanismBaseTestCase(base.AgentMechanismBaseTestCase): + VIF_TYPE = portbindings.VIF_TYPE_OVS + CAP_PORT_FILTER = True + AGENT_TYPE = constants.AGENT_TYPE_OFA + + GOOD_MAPPINGS = {'fake_physical_network': 'fake_bridge'} + GOOD_TUNNEL_TYPES = ['gre', 'vxlan'] + GOOD_CONFIGS = {'bridge_mappings': GOOD_MAPPINGS, + 'tunnel_types': GOOD_TUNNEL_TYPES} + + BAD_MAPPINGS = {'wrong_physical_network': 'wrong_bridge'} + BAD_TUNNEL_TYPES = ['bad_tunnel_type'] + BAD_CONFIGS = {'bridge_mappings': BAD_MAPPINGS, + 'tunnel_types': BAD_TUNNEL_TYPES} + + AGENTS = [{'alive': True, + 'configurations': GOOD_CONFIGS}] + AGENTS_DEAD = [{'alive': False, + 'configurations': GOOD_CONFIGS}] + AGENTS_BAD = [{'alive': False, + 'configurations': GOOD_CONFIGS}, + {'alive': True, + 'configurations': BAD_CONFIGS}] + + def setUp(self): + super(OfagentMechanismBaseTestCase, self).setUp() + self.driver = mech_ofagent.OfagentMechanismDriver() + self.driver.initialize() + + +class OfagentMechanismGenericTestCase(OfagentMechanismBaseTestCase, + base.AgentMechanismGenericTestCase): + pass + + +class OfagentMechanismLocalTestCase(OfagentMechanismBaseTestCase, + base.AgentMechanismLocalTestCase): + pass + + +class OfagentMechanismFlatTestCase(OfagentMechanismBaseTestCase, + base.AgentMechanismFlatTestCase): + pass + + +class OfagentMechanismVlanTestCase(OfagentMechanismBaseTestCase, + base.AgentMechanismVlanTestCase): + pass + + +class OfagentMechanismGreTestCase(OfagentMechanismBaseTestCase, + base.AgentMechanismGreTestCase): + pass diff --git a/neutron/tests/unit/ofagent/__init__.py b/neutron/tests/unit/ofagent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/tests/unit/ofagent/fake_oflib.py b/neutron/tests/unit/ofagent/fake_oflib.py new file mode 100644 index 000000000..822c49c5c --- /dev/null +++ b/neutron/tests/unit/ofagent/fake_oflib.py @@ -0,0 +1,43 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. + +import mock + + +def patch_fake_oflib_of(): + ryu_mod = mock.Mock() + ryu_base_mod = ryu_mod.base + ryu_lib_mod = ryu_mod.lib + ryu_lib_hub = ryu_lib_mod.hub + ryu_ofproto_mod = ryu_mod.ofproto + ryu_ofproto_of13 = ryu_ofproto_mod.ofproto_v1_3 + ryu_ofproto_of13.OFPTT_ALL = 0xff + ryu_ofproto_of13.OFPG_ANY = 0xffffffff + ryu_ofproto_of13.OFPP_ANY = 0xffffffff + ryu_ofproto_of13.OFPFC_ADD = 0 + ryu_ofproto_of13.OFPFC_DELETE = 3 + ryu_app_mod = ryu_mod.app + ryu_app_ofctl_mod = ryu_app_mod.ofctl + ryu_ofctl_api = ryu_app_ofctl_mod.api + return mock.patch.dict('sys.modules', + {'ryu': ryu_mod, + 'ryu.base': ryu_base_mod, + 'ryu.lib': ryu_lib_mod, + 'ryu.lib.hub': ryu_lib_hub, + 'ryu.ofproto': ryu_ofproto_mod, + 'ryu.ofproto.ofproto_v1_3': ryu_ofproto_of13, + 'ryu.app': ryu_app_mod, + 'ryu.app.ofctl': ryu_app_ofctl_mod, + 'ryu.app.ofctl.api': ryu_ofctl_api}) diff --git a/neutron/tests/unit/ofagent/test_ofa_defaults.py b/neutron/tests/unit/ofagent/test_ofa_defaults.py new file mode 100644 index 000000000..635070b51 --- /dev/null +++ b/neutron/tests/unit/ofagent/test_ofa_defaults.py @@ -0,0 +1,25 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. + +from oslo.config import cfg + +from neutron.plugins.ofagent.common import config # noqa +from neutron.tests import base + + +class ConfigurationTest(base.BaseTestCase): + """Configuration file Tests.""" + def test_ml2_defaults(self): + self.assertEqual(60, cfg.CONF.AGENT.get_datapath_retry_times) diff --git a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py new file mode 100644 index 000000000..1c5b34715 --- /dev/null +++ b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py @@ -0,0 +1,660 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# Based on test for openvswitch agent(test_ovs_neutron_agent.py). +# +# Copyright (c) 2012 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. + +import contextlib + +import mock +from oslo.config import cfg +import testtools + +from neutron.agent.linux import ip_lib +from neutron.agent.linux import utils +from neutron.openstack.common import importutils +from neutron.openstack.common.rpc import common as rpc_common +from neutron.plugins.common import constants as p_const +from neutron.plugins.openvswitch.common import constants +from neutron.tests import base +from neutron.tests.unit.ofagent import fake_oflib + + +NOTIFIER = ('neutron.plugins.ml2.rpc.AgentNotifierApi') + + +class OFAAgentTestCase(base.BaseTestCase): + + _AGENT_NAME = 'neutron.plugins.ofagent.agent.ofa_neutron_agent' + + def setUp(self): + super(OFAAgentTestCase, self).setUp() + self.addCleanup(mock.patch.stopall) + self.fake_oflib_of = fake_oflib.patch_fake_oflib_of().start() + self.mod_agent = importutils.import_module(self._AGENT_NAME) + self.ryuapp = mock.Mock() + cfg.CONF.register_cli_opts([ + cfg.StrOpt('ofp-listen-host', default='', + help='openflow listen host'), + cfg.IntOpt('ofp-tcp-listen-port', default=6633, + help='openflow tcp listen port') + ]) + cfg.CONF.set_override('root_helper', 'fake_helper', group='AGENT') + + +class CreateAgentConfigMap(OFAAgentTestCase): + + def test_create_agent_config_map_succeeds(self): + self.assertTrue(self.mod_agent.create_agent_config_map(cfg.CONF)) + + def test_create_agent_config_map_fails_for_invalid_tunnel_config(self): + # An ip address is required for tunneling but there is no default, + # verify this for both gre and vxlan tunnels. + cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE], + group='AGENT') + with testtools.ExpectedException(ValueError): + self.mod_agent.create_agent_config_map(cfg.CONF) + cfg.CONF.set_override('tunnel_types', [p_const.TYPE_VXLAN], + group='AGENT') + with testtools.ExpectedException(ValueError): + self.mod_agent.create_agent_config_map(cfg.CONF) + + def test_create_agent_config_map_enable_tunneling(self): + # Verify setting only enable_tunneling will default tunnel_type to GRE + cfg.CONF.set_override('tunnel_types', None, group='AGENT') + cfg.CONF.set_override('enable_tunneling', True, group='OVS') + cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS') + cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF) + self.assertEqual(cfgmap['tunnel_types'], [p_const.TYPE_GRE]) + + def test_create_agent_config_map_fails_no_local_ip(self): + # An ip address is required for tunneling but there is no default + cfg.CONF.set_override('enable_tunneling', True, group='OVS') + with testtools.ExpectedException(ValueError): + self.mod_agent.create_agent_config_map(cfg.CONF) + + def test_create_agent_config_map_fails_for_invalid_tunnel_type(self): + cfg.CONF.set_override('tunnel_types', ['foobar'], group='AGENT') + with testtools.ExpectedException(ValueError): + self.mod_agent.create_agent_config_map(cfg.CONF) + + def test_create_agent_config_map_multiple_tunnel_types(self): + cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS') + cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE, + p_const.TYPE_VXLAN], group='AGENT') + cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF) + self.assertEqual(cfgmap['tunnel_types'], + [p_const.TYPE_GRE, p_const.TYPE_VXLAN]) + + +class TestOFANeutronAgentOVSBridge(OFAAgentTestCase): + + def setUp(self): + super(TestOFANeutronAgentOVSBridge, self).setUp() + self.br_name = 'bridge1' + self.root_helper = 'fake_helper' + self.ovs = self.mod_agent.OVSBridge( + self.br_name, self.root_helper, self.ryuapp) + + def test_find_datapath_id(self): + with mock.patch.object(self.ovs, 'get_datapath_id', + return_value='12345'): + self.ovs.find_datapath_id() + self.assertEqual(self.ovs.datapath_id, '12345') + + def _fake_get_datapath(self, app, datapath_id): + if self.ovs.retry_count >= 2: + datapath = mock.Mock() + datapath.ofproto_parser = mock.Mock() + return datapath + self.ovs.retry_count += 1 + return None + + def test_get_datapath_normal(self): + self.ovs.retry_count = 0 + with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath', + new=self._fake_get_datapath): + self.ovs.datapath_id = '0x64' + self.ovs.get_datapath(retry_max=4) + self.assertEqual(self.ovs.retry_count, 2) + + def test_get_datapath_retry_out_by_default_time(self): + cfg.CONF.set_override('get_datapath_retry_times', 3, group='AGENT') + with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath', + return_value=None) as mock_get_datapath: + with testtools.ExpectedException(SystemExit): + self.ovs.datapath_id = '0x64' + self.ovs.get_datapath(retry_max=3) + self.assertEqual(mock_get_datapath.call_count, 3) + + def test_get_datapath_retry_out_by_specified_time(self): + with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath', + return_value=None) as mock_get_datapath: + with testtools.ExpectedException(SystemExit): + self.ovs.datapath_id = '0x64' + self.ovs.get_datapath(retry_max=2) + self.assertEqual(mock_get_datapath.call_count, 2) + + def test_setup_ofp_default_par(self): + with contextlib.nested( + mock.patch.object(self.ovs, 'set_protocols'), + mock.patch.object(self.ovs, 'set_controller'), + mock.patch.object(self.ovs, 'find_datapath_id'), + mock.patch.object(self.ovs, 'get_datapath'), + ) as (mock_set_protocols, mock_set_controller, + mock_find_datapath_id, mock_get_datapath): + self.ovs.setup_ofp() + mock_set_protocols.assert_called_with('OpenFlow13') + mock_set_controller.assert_called_with(['tcp:127.0.0.1:6633']) + mock_get_datapath.assert_called_with( + cfg.CONF.AGENT.get_datapath_retry_times) + self.assertEqual(mock_find_datapath_id.call_count, 1) + + def test_setup_ofp_specify_par(self): + controller_names = ['tcp:192.168.10.10:1234', 'tcp:172.17.16.20:5555'] + with contextlib.nested( + mock.patch.object(self.ovs, 'set_protocols'), + mock.patch.object(self.ovs, 'set_controller'), + mock.patch.object(self.ovs, 'find_datapath_id'), + mock.patch.object(self.ovs, 'get_datapath'), + ) as (mock_set_protocols, mock_set_controller, + mock_find_datapath_id, mock_get_datapath): + self.ovs.setup_ofp(controller_names=controller_names, + protocols='OpenFlow133', + retry_max=11) + mock_set_protocols.assert_called_with('OpenFlow133') + mock_set_controller.assert_called_with(controller_names) + mock_get_datapath.assert_called_with(11) + self.assertEqual(mock_find_datapath_id.call_count, 1) + + def test_setup_ofp_with_except(self): + with contextlib.nested( + mock.patch.object(self.ovs, 'set_protocols', + side_effect=RuntimeError), + mock.patch.object(self.ovs, 'set_controller'), + mock.patch.object(self.ovs, 'find_datapath_id'), + mock.patch.object(self.ovs, 'get_datapath'), + ) as (mock_set_protocols, mock_set_controller, + mock_find_datapath_id, mock_get_datapath): + with testtools.ExpectedException(SystemExit): + self.ovs.setup_ofp() + + +class TestOFANeutronAgent(OFAAgentTestCase): + + def setUp(self): + super(TestOFANeutronAgent, self).setUp() + notifier_p = mock.patch(NOTIFIER) + notifier_cls = notifier_p.start() + self.notifier = mock.Mock() + notifier_cls.return_value = self.notifier + # Avoid rpc initialization for unit tests + cfg.CONF.set_override('rpc_backend', + 'neutron.openstack.common.rpc.impl_fake') + kwargs = self.mod_agent.create_agent_config_map(cfg.CONF) + + class MockFixedIntervalLoopingCall(object): + def __init__(self, f): + self.f = f + + def start(self, interval=0): + self.f() + + with contextlib.nested( + mock.patch.object(self.mod_agent.OFANeutronAgent, + 'setup_integration_br', + return_value=mock.Mock()), + mock.patch.object(self.mod_agent.OFANeutronAgent, + 'setup_ancillary_bridges', + return_value=[]), + mock.patch.object(self.mod_agent.OVSBridge, + 'get_local_port_mac', + return_value='00:00:00:00:00:01'), + mock.patch('neutron.agent.linux.utils.get_interface_mac', + return_value='00:00:00:00:00:01'), + mock.patch('neutron.openstack.common.loopingcall.' + 'FixedIntervalLoopingCall', + new=MockFixedIntervalLoopingCall)): + self.agent = self.mod_agent.OFANeutronAgent(self.ryuapp, **kwargs) + self.agent.tun_br = mock.Mock() + self.datapath = mock.Mock() + self.ofparser = mock.Mock() + self.datapath.ofparser = self.ofparser + self.ofparser.OFPMatch = mock.Mock() + self.ofparser.OFPMatch.return_value = mock.Mock() + self.ofparser.OFPFlowMod = mock.Mock() + self.ofparser.OFPFlowMod.return_value = mock.Mock() + self.agent.int_br.ofparser = self.ofparser + + self.agent.sg_agent = mock.Mock() + + def _mock_port_bound(self, ofport=None): + port = mock.Mock() + port.ofport = ofport + net_uuid = 'my-net-uuid' + with mock.patch.object(self.mod_agent.OVSBridge, + 'set_db_attribute', + return_value=True): + with mock.patch.object(self.agent, + 'ryu_send_msg') as ryu_send_msg_func: + self.agent.port_bound(port, net_uuid, 'local', None, None) + self.assertEqual(ryu_send_msg_func.called, ofport != -1) + + def test_port_bound_deletes_flows_for_valid_ofport(self): + self._mock_port_bound(ofport=1) + + def test_port_bound_ignores_flows_for_invalid_ofport(self): + self._mock_port_bound(ofport=-1) + + def test_port_dead(self): + with mock.patch.object(self.mod_agent.OVSBridge, + 'set_db_attribute', + return_value=True): + with mock.patch.object(self.agent, + 'ryu_send_msg') as ryu_send_msg_func: + port = mock.Mock() + port.ofport = 2 + self.agent.port_dead(port) + self.assertTrue(ryu_send_msg_func.called) + + def mock_update_ports(self, vif_port_set=None, registered_ports=None): + with mock.patch.object(self.agent.int_br, 'get_vif_port_set', + return_value=vif_port_set): + return self.agent.update_ports(registered_ports) + + def test_update_ports_returns_none_for_unchanged_ports(self): + self.assertIsNone(self.mock_update_ports()) + + def test_update_ports_returns_port_changes(self): + vif_port_set = set([1, 3]) + registered_ports = set([1, 2]) + expected = dict(current=vif_port_set, added=set([3]), removed=set([2])) + actual = self.mock_update_ports(vif_port_set, registered_ports) + self.assertEqual(expected, actual) + + def test_treat_devices_added_returns_true_for_missing_device(self): + with mock.patch.object(self.agent.plugin_rpc, 'get_device_details', + side_effect=Exception()): + self.assertTrue(self.agent.treat_devices_added([{}])) + + def _mock_treat_devices_added(self, details, port, func_name): + """Mock treat devices added. + + :param details: the details to return for the device + :param port: the port that get_vif_port_by_id should return + :param func_name: the function that should be called + :returns: whether the named function was called + """ + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, 'get_device_details', + return_value=details), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=port), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent, func_name) + ) as (get_dev_fn, get_vif_func, upd_dev_up, func): + self.assertFalse(self.agent.treat_devices_added([{}])) + return func.called + + def test_treat_devices_added_ignores_invalid_ofport(self): + port = mock.Mock() + port.ofport = -1 + self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port, + 'port_dead')) + + def test_treat_devices_added_marks_unknown_port_as_dead(self): + port = mock.Mock() + port.ofport = 1 + self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port, + 'port_dead')) + + def test_treat_devices_added_updates_known_port(self): + details = mock.MagicMock() + details.__contains__.side_effect = lambda x: True + self.assertTrue(self._mock_treat_devices_added(details, + mock.Mock(), + 'treat_vif_port')) + + def test_treat_devices_removed_returns_true_for_missing_device(self): + with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', + side_effect=Exception()): + self.assertTrue(self.agent.treat_devices_removed([{}])) + + def _mock_treat_devices_removed(self, port_exists): + details = dict(exists=port_exists) + with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', + return_value=details): + with mock.patch.object(self.agent, 'port_unbound') as port_unbound: + self.assertFalse(self.agent.treat_devices_removed([{}])) + self.assertTrue(port_unbound.called) + + def test_treat_devices_removed_unbinds_port(self): + self._mock_treat_devices_removed(True) + + def test_treat_devices_removed_ignores_missing_port(self): + self._mock_treat_devices_removed(False) + + def test_process_network_ports(self): + reply = {'current': set(['tap0']), + 'removed': set(['eth0']), + 'added': set(['eth1'])} + with mock.patch.object(self.agent, 'treat_devices_added', + return_value=False) as device_added: + with mock.patch.object(self.agent, 'treat_devices_removed', + return_value=False) as device_removed: + self.assertFalse(self.agent.process_network_ports(reply)) + device_added.assert_called_once_with(set(['eth1'])) + device_removed.assert_called_once_with(set(['eth0'])) + + def test_report_state(self): + with mock.patch.object(self.agent.state_rpc, + "report_state") as report_st: + self.agent.int_br_device_count = 5 + self.agent._report_state() + report_st.assert_called_with(self.agent.context, + self.agent.agent_state) + self.assertNotIn("start_flag", self.agent.agent_state) + self.assertEqual( + self.agent.agent_state["configurations"]["devices"], + self.agent.int_br_device_count + ) + + def test_network_delete(self): + with contextlib.nested( + mock.patch.object(self.agent, "reclaim_local_vlan"), + mock.patch.object(self.agent.tun_br, "cleanup_tunnel_port") + ) as (recl_fn, clean_tun_fn): + self.agent.network_delete("unused_context", + network_id="123") + self.assertFalse(recl_fn.called) + self.agent.local_vlan_map["123"] = "LVM object" + self.agent.network_delete("unused_context", + network_id="123") + self.assertFalse(clean_tun_fn.called) + recl_fn.assert_called_with("123") + + def test_port_update(self): + with contextlib.nested( + mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), + mock.patch.object(self.agent, "treat_vif_port"), + mock.patch.object(self.agent.plugin_rpc, "update_device_up"), + mock.patch.object(self.agent.plugin_rpc, "update_device_down") + ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn): + port = {"id": "123", + "network_id": "124", + "admin_state_up": False} + getvif_fn.return_value = "vif_port_obj" + self.agent.port_update("unused_context", + port=port, + network_type="vlan", + segmentation_id="1", + physical_network="physnet") + treatvif_fn.assert_called_with("vif_port_obj", "123", + "124", "vlan", "physnet", + "1", False) + upddown_fn.assert_called_with(self.agent.context, + "123", self.agent.agent_id, + cfg.CONF.host) + + port["admin_state_up"] = True + self.agent.port_update("unused_context", + port=port, + network_type="vlan", + segmentation_id="1", + physical_network="physnet") + updup_fn.assert_called_with(self.agent.context, + "123", self.agent.agent_id, + cfg.CONF.host) + + def test_port_update_plugin_rpc_failed(self): + port = {'id': 1, + 'network_id': 1, + 'admin_state_up': True} + with contextlib.nested( + mock.patch.object(self.mod_agent.LOG, 'error'), + mock.patch.object(self.agent.int_br, "get_vif_port_by_id"), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent, 'port_bound'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), + mock.patch.object(self.agent, 'port_dead') + ) as (log, _, device_up, _, device_down, _): + device_up.side_effect = rpc_common.Timeout + self.agent.port_update(mock.Mock(), port=port) + self.assertTrue(device_up.called) + self.assertEqual(log.call_count, 1) + + log.reset_mock() + port['admin_state_up'] = False + device_down.side_effect = rpc_common.Timeout + self.agent.port_update(mock.Mock(), port=port) + self.assertTrue(device_down.called) + self.assertEqual(log.call_count, 1) + + def test_setup_physical_bridges(self): + with contextlib.nested( + mock.patch.object(ip_lib, "device_exists"), + mock.patch.object(utils, "execute"), + mock.patch.object(self.mod_agent.OVSBridge, "add_port"), + mock.patch.object(self.mod_agent.OVSBridge, "delete_port"), + mock.patch.object(self.mod_agent.OVSBridge, "set_protocols"), + mock.patch.object(self.mod_agent.OVSBridge, "set_controller"), + mock.patch.object(self.mod_agent.OVSBridge, "get_datapath_id", + return_value='0xa'), + mock.patch.object(self.agent.int_br, "add_port"), + mock.patch.object(self.agent.int_br, "delete_port"), + mock.patch.object(ip_lib.IPWrapper, "add_veth"), + mock.patch.object(ip_lib.IpLinkCommand, "delete"), + mock.patch.object(ip_lib.IpLinkCommand, "set_up"), + mock.patch.object(ip_lib.IpLinkCommand, "set_mtu"), + mock.patch.object(self.mod_agent.ryu_api, "get_datapath", + return_value=self.datapath) + ) as (devex_fn, utilsexec_fn, + ovs_addport_fn, ovs_delport_fn, ovs_set_protocols_fn, + ovs_set_controller_fn, ovs_datapath_id_fn, br_addport_fn, + br_delport_fn, addveth_fn, linkdel_fn, linkset_fn, linkmtu_fn, + ryu_api_fn): + devex_fn.return_value = True + parent = mock.MagicMock() + parent.attach_mock(utilsexec_fn, 'utils_execute') + parent.attach_mock(linkdel_fn, 'link_delete') + parent.attach_mock(addveth_fn, 'add_veth') + addveth_fn.return_value = (ip_lib.IPDevice("int-br-eth1"), + ip_lib.IPDevice("phy-br-eth1")) + ovs_addport_fn.return_value = "25" + br_addport_fn.return_value = "11" + self.agent.setup_physical_bridges({"physnet1": "br-eth"}) + expected_calls = [mock.call.link_delete(), + mock.call.utils_execute(['/sbin/udevadm', + 'settle', + '--timeout=10']), + mock.call.add_veth('int-br-eth', + 'phy-br-eth')] + parent.assert_has_calls(expected_calls, any_order=False) + self.assertEqual(self.agent.int_ofports["physnet1"], + "11") + self.assertEqual(self.agent.phys_ofports["physnet1"], + "25") + + def test_port_unbound(self): + with mock.patch.object(self.agent, "reclaim_local_vlan") as reclvl_fn: + self.agent.enable_tunneling = True + lvm = mock.Mock() + lvm.network_type = "gre" + lvm.vif_ports = {"vif1": mock.Mock()} + self.agent.local_vlan_map["netuid12345"] = lvm + self.agent.port_unbound("vif1", "netuid12345") + self.assertTrue(reclvl_fn.called) + reclvl_fn.called = False + + lvm.vif_ports = {} + self.agent.port_unbound("vif1", "netuid12345") + self.assertEqual(reclvl_fn.call_count, 2) + + lvm.vif_ports = {"vif1": mock.Mock()} + self.agent.port_unbound("vif3", "netuid12345") + self.assertEqual(reclvl_fn.call_count, 2) + + def _check_ovs_vxlan_version(self, installed_usr_version, + installed_klm_version, + expecting_ok): + with mock.patch( + 'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version' + ) as klm_cmd: + with mock.patch( + 'neutron.agent.linux.ovs_lib.get_installed_ovs_usr_version' + ) as usr_cmd: + try: + klm_cmd.return_value = installed_klm_version + usr_cmd.return_value = installed_usr_version + self.agent.tunnel_types = 'vxlan' + self.agent._check_ovs_version() + version_ok = True + except SystemExit as e: + self.assertEqual(e.code, 1) + version_ok = False + self.assertEqual(version_ok, expecting_ok) + + def test_check_minimum_version(self): + min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION + self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver, + expecting_ok=True) + + def test_check_future_version(self): + install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01) + self._check_ovs_vxlan_version(install_ver, install_ver, + expecting_ok=True) + + def test_check_fail_version(self): + install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01) + self._check_ovs_vxlan_version(install_ver, install_ver, + expecting_ok=False) + + def test_check_fail_no_version(self): + self._check_ovs_vxlan_version(None, None, + expecting_ok=False) + + def test_check_fail_klm_version(self): + min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION + install_ver = str(float(min_vxlan_ver) - 0.01) + self._check_ovs_vxlan_version(min_vxlan_ver, install_ver, + expecting_ok=False) + + def test_daemon_loop_uses_polling_manager(self): + with mock.patch( + 'neutron.agent.linux.polling.get_polling_manager' + ) as mock_get_pm: + fake_pm = mock.Mock() + mock_get_pm.return_value = fake_pm + fake_pm.__enter__ = mock.Mock() + fake_pm.__exit__ = mock.Mock() + with mock.patch.object( + self.agent, 'ovsdb_monitor_loop' + ) as mock_loop: + self.agent.daemon_loop() + mock_get_pm.assert_called_once_with(True, 'fake_helper', + constants.DEFAULT_OVSDBMON_RESPAWN) + mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__()) + + def test_setup_tunnel_port_error_negative(self): + with contextlib.nested( + mock.patch.object(self.agent.tun_br, 'add_tunnel_port', + return_value='-1'), + mock.patch.object(self.mod_agent.LOG, 'error') + ) as (add_tunnel_port_fn, log_error_fn): + ofport = self.agent.setup_tunnel_port( + 'gre-1', 'remote_ip', p_const.TYPE_GRE) + add_tunnel_port_fn.assert_called_once_with( + 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE, + self.agent.vxlan_udp_port) + log_error_fn.assert_called_once_with( + _("Failed to set-up %(type)s tunnel port to %(ip)s"), + {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'}) + self.assertEqual(ofport, 0) + + def test_setup_tunnel_port_error_not_int(self): + with contextlib.nested( + mock.patch.object(self.agent.tun_br, 'add_tunnel_port', + return_value=None), + mock.patch.object(self.mod_agent.LOG, 'exception'), + mock.patch.object(self.mod_agent.LOG, 'error') + ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn): + ofport = self.agent.setup_tunnel_port( + 'gre-1', 'remote_ip', p_const.TYPE_GRE) + add_tunnel_port_fn.assert_called_once_with( + 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE, + self.agent.vxlan_udp_port) + log_exc_fn.assert_called_once_with( + _("ofport should have a value that can be " + "interpreted as an integer")) + log_error_fn.assert_called_once_with( + _("Failed to set-up %(type)s tunnel port to %(ip)s"), + {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'}) + self.assertEqual(ofport, 0) + + +class AncillaryBridgesTest(OFAAgentTestCase): + + def setUp(self): + super(AncillaryBridgesTest, self).setUp() + notifier_p = mock.patch(NOTIFIER) + notifier_cls = notifier_p.start() + self.notifier = mock.Mock() + notifier_cls.return_value = self.notifier + # Avoid rpc initialization for unit tests + cfg.CONF.set_override('rpc_backend', + 'neutron.openstack.common.rpc.impl_fake') + cfg.CONF.set_override('report_interval', 0, 'AGENT') + self.kwargs = self.mod_agent.create_agent_config_map(cfg.CONF) + + def _test_ancillary_bridges(self, bridges, ancillary): + device_ids = ancillary[:] + + def pullup_side_effect(self, *args): + result = device_ids.pop(0) + return result + + with contextlib.nested( + mock.patch.object(self.mod_agent.OFANeutronAgent, + 'setup_integration_br', + return_value=mock.Mock()), + mock.patch('neutron.agent.linux.utils.get_interface_mac', + return_value='00:00:00:00:00:01'), + mock.patch.object(self.mod_agent.OVSBridge, + 'get_local_port_mac', + return_value='00:00:00:00:00:01'), + mock.patch('neutron.agent.linux.ovs_lib.get_bridges', + return_value=bridges), + mock.patch( + 'neutron.agent.linux.ovs_lib.get_bridge_external_bridge_id', + side_effect=pullup_side_effect)): + self.agent = self.mod_agent.OFANeutronAgent( + self.ryuapp, **self.kwargs) + self.assertEqual(len(ancillary), len(self.agent.ancillary_brs)) + if ancillary: + bridges = [br.br_name for br in self.agent.ancillary_brs] + for br in ancillary: + self.assertIn(br, bridges) + + def test_ancillary_bridges_single(self): + bridges = ['br-int', 'br-ex'] + self._test_ancillary_bridges(bridges, ['br-ex']) + + def test_ancillary_bridges_none(self): + bridges = ['br-int'] + self._test_ancillary_bridges(bridges, []) + + def test_ancillary_bridges_multiple(self): + bridges = ['br-int', 'br-ex1', 'br-ex2'] + self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2']) diff --git a/neutron/tests/unit/openvswitch/test_ovs_lib.py b/neutron/tests/unit/openvswitch/test_ovs_lib.py index a556cf74f..c3334279a 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_lib.py +++ b/neutron/tests/unit/openvswitch/test_ovs_lib.py @@ -20,6 +20,7 @@ from neutron.agent.linux import ovs_lib from neutron.agent.linux import utils from neutron.openstack.common import jsonutils from neutron.openstack.common import uuidutils +from neutron.plugins.openvswitch.common import constants from neutron.tests import base from neutron.tests import tools @@ -131,6 +132,37 @@ class OVS_Lib_Test(base.BaseTestCase): # test __str__ str(port) + def test_set_controller(self): + controller_names = ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'] + self.br.set_controller(controller_names) + self.execute.assert_called_once_with( + ['ovs-vsctl', self.TO, '--', 'set-controller', self.BR_NAME, + 'tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'], + root_helper=self.root_helper) + + def test_del_controller(self): + self.br.del_controller() + self.execute.assert_called_once_with( + ['ovs-vsctl', self.TO, '--', 'del-controller', self.BR_NAME], + root_helper=self.root_helper) + + def test_get_controller(self): + self.execute.return_value = 'tcp:127.0.0.1:6633\ntcp:172.17.16.10:5555' + names = self.br.get_controller() + self.assertEqual(names, + ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555']) + self.execute.assert_called_once_with( + ['ovs-vsctl', self.TO, '--', 'get-controller', self.BR_NAME], + root_helper=self.root_helper) + + def test_set_protocols(self): + protocols = 'OpenFlow13' + self.br.set_protocols(protocols) + self.execute.assert_called_once_with( + ['ovs-vsctl', self.TO, '--', 'set', 'bridge', self.BR_NAME, + "protocols=%s" % protocols], + root_helper=self.root_helper) + def test_create(self): self.br.add_bridge(self.BR_NAME) @@ -666,3 +698,47 @@ class OVS_Lib_Test(base.BaseTestCase): data = [[["map", external_ids], "tap99", 1]] self.assertIsNone(self._test_get_vif_port_by_id('tap99id', data, "br-ext")) + + def _check_ovs_vxlan_version(self, installed_usr_version, + installed_klm_version, + expecting_ok): + with mock.patch( + 'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version' + ) as klm_cmd: + with mock.patch( + 'neutron.agent.linux.ovs_lib.get_installed_ovs_usr_version' + ) as usr_cmd: + try: + klm_cmd.return_value = installed_klm_version + usr_cmd.return_value = installed_usr_version + ovs_lib.check_ovs_vxlan_version(root_helper='sudo') + version_ok = True + except SystemError: + version_ok = False + self.assertEqual(version_ok, expecting_ok) + + def test_check_minimum_version(self): + min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION + self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver, + expecting_ok=True) + + def test_check_future_version(self): + install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01) + self._check_ovs_vxlan_version(install_ver, install_ver, + expecting_ok=True) + + def test_check_fail_version(self): + install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01) + self._check_ovs_vxlan_version(install_ver, install_ver, + expecting_ok=False) + + def test_check_fail_no_version(self): + self._check_ovs_vxlan_version(None, None, + expecting_ok=False) + + def test_check_fail_klm_version(self): + min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION + install_ver = str(float(min_vxlan_ver) - 0.01) + self._check_ovs_vxlan_version(min_vxlan_ver, + install_ver, + expecting_ok=False) diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index 959101cab..021630846 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -468,7 +468,7 @@ class TestOvsNeutronAgent(base.BaseTestCase): self.assertEqual(reclvl_fn.call_count, 2) def _check_ovs_vxlan_version(self, installed_usr_version, - installed_klm_version, min_vers, + installed_klm_version, expecting_ok): with mock.patch( 'neutron.agent.linux.ovs_lib.get_installed_ovs_klm_version' @@ -480,8 +480,7 @@ class TestOvsNeutronAgent(base.BaseTestCase): klm_cmd.return_value = installed_klm_version usr_cmd.return_value = installed_usr_version self.agent.tunnel_types = 'vxlan' - ovs_neutron_agent.check_ovs_version(min_vers, - root_helper='sudo') + self.agent._check_ovs_version() version_ok = True except SystemExit as e: self.assertEqual(e.code, 1) @@ -489,28 +488,28 @@ class TestOvsNeutronAgent(base.BaseTestCase): self.assertEqual(version_ok, expecting_ok) def test_check_minimum_version(self): - self._check_ovs_vxlan_version('1.10', '1.10', - constants.MINIMUM_OVS_VXLAN_VERSION, + min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION + self._check_ovs_vxlan_version(min_vxlan_ver, min_vxlan_ver, expecting_ok=True) def test_check_future_version(self): - self._check_ovs_vxlan_version('1.11', '1.11', - constants.MINIMUM_OVS_VXLAN_VERSION, + install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) + 0.01) + self._check_ovs_vxlan_version(install_ver, install_ver, expecting_ok=True) def test_check_fail_version(self): - self._check_ovs_vxlan_version('1.9', '1.9', - constants.MINIMUM_OVS_VXLAN_VERSION, + install_ver = str(float(constants.MINIMUM_OVS_VXLAN_VERSION) - 0.01) + self._check_ovs_vxlan_version(install_ver, install_ver, expecting_ok=False) def test_check_fail_no_version(self): self._check_ovs_vxlan_version(None, None, - constants.MINIMUM_OVS_VXLAN_VERSION, expecting_ok=False) def test_check_fail_klm_version(self): - self._check_ovs_vxlan_version('1.10', '1.9', - constants.MINIMUM_OVS_VXLAN_VERSION, + min_vxlan_ver = constants.MINIMUM_OVS_VXLAN_VERSION + install_ver = str(float(min_vxlan_ver) - 0.01) + self._check_ovs_vxlan_version(min_vxlan_ver, install_ver, expecting_ok=False) def _prepare_l2_pop_ofports(self): diff --git a/setup.cfg b/setup.cfg index 1193b7053..bc93250d5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -60,6 +60,7 @@ data_files = etc/neutron/plugins/ml2/ml2_conf_arista.ini etc/neutron/plugins/ml2/ml2_conf_cisco.ini etc/neutron/plugins/bigswitch/restproxy.ini + etc/neutron/plugins/ml2/ml2_conf_ofa.ini etc/neutron/plugins/mlnx = etc/neutron/plugins/mlnx/mlnx_conf.ini etc/neutron/plugins/nec = etc/neutron/plugins/nec/nec.ini etc/neutron/plugins/nicira = etc/neutron/plugins/nicira/nvp.ini @@ -125,6 +126,7 @@ console_scripts = quantum-rootwrap = oslo.rootwrap.cmd:main quantum-usage-audit = neutron.cmd.usage_audit:main neutron-metering-agent = neutron.services.metering.agents.metering_agent:main + neutron-ofagent-agent = ryu.cmd.ofa_neutron_agent:main neutron.core_plugins = bigswitch = neutron.plugins.bigswitch.plugin:NeutronRestProxyV2 brocade = neutron.plugins.brocade.NeutronPlugin:BrocadePluginV2 @@ -167,6 +169,7 @@ neutron.ml2.mechanism_drivers = cisco_nexus = neutron.plugins.ml2.drivers.cisco.nexus.mech_cisco_nexus:CiscoNexusMechanismDriver l2population = neutron.plugins.ml2.drivers.l2pop.mech_driver:L2populationMechanismDriver bigswitch = neutron.plugins.ml2.drivers.mech_bigswitch.driver:BigSwitchMechanismDriver + ofagent = neutron.plugins.ml2.drivers.mech_ofagent:OfagentMechanismDriver neutron.openstack.common.cache.backends = memory = neutron.openstack.common.cache._backends.memory:MemoryBackend