From 9d13ea884bff749b3975acb5eb5630e5aca4a665 Mon Sep 17 00:00:00 2001 From: YAMAMOTO Takashi Date: Tue, 3 Jun 2014 15:53:22 +0900 Subject: [PATCH] ofagent: Use port desc to monitor ports on br-int also, use ofport.name as device identifiers. sprinkle some TODO comments. Implements: blueprint ofagent-port-monitor Change-Id: I4995964f37729f954fec71c4a2e61e463a430b1a --- .../ofagent/agent/ofa_neutron_agent.py | 60 +++++++++++--- neutron/plugins/ofagent/agent/ports.py | 27 +++++++ neutron/tests/unit/ofagent/fake_oflib.py | 12 ++- .../unit/ofagent/test_ofa_neutron_agent.py | 79 +++++++++++++------ neutron/tests/unit/ofagent/test_ofa_ports.py | 32 ++++++++ 5 files changed, 174 insertions(+), 36 deletions(-) create mode 100644 neutron/plugins/ofagent/agent/ports.py create mode 100644 neutron/tests/unit/ofagent/test_ofa_ports.py diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index 7ff3040b0..5cc77ec4e 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -41,6 +41,7 @@ from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const +from neutron.plugins.ofagent.agent import ports from neutron.plugins.ofagent.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -302,6 +303,36 @@ class OFANeutronAgent(rpc_compat.RpcCallback, self._report_state) heartbeat.start(interval=report_interval) + @staticmethod + def _get_ofport_name(interface_id): + """Convert from neutron device id (uuid) to OpenFlow port name. + + This needs to be synced with ML2 plugin's _device_to_port_id(). + + An assumption: The switch uses an OS's interface name as the + corresponding OpenFlow port name. + NOTE(yamamoto): While it's true for Open vSwitch, it isn't + necessarily true everywhere. For example, LINC uses something + like "LogicalSwitch0-Port2". + """ + return "tap" + interface_id[0:11] + + def _get_ports(self, br): + """Generate ports.Port instances for the given bridge.""" + datapath = br.datapath + ofpp = datapath.ofproto_parser + msg = ofpp.OFPPortDescStatsRequest(datapath=datapath) + descs = ryu_api.send_msg(app=self.ryuapp, msg=msg, + reply_cls=ofpp.OFPPortDescStatsReply, + reply_multi=True) + for d in descs: + for p in d.body: + yield ports.Port.from_ofp_port(p) + + def _get_ofport_names(self, br): + """Return a set of OpenFlow port names for the given bridge.""" + return set(p.port_name for p in self._get_ports(br)) + def get_net_uuid(self, vif_id): for network_id, vlan_mapping in self.local_vlan_map.iteritems(): if vif_id in vlan_mapping.vif_ports: @@ -323,7 +354,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback, # Even if full port details might be provided to this call, # they are not used since there is no guarantee the notifications # are processed in the same order as the relevant API requests - self.updated_ports.add(port['id']) + self.updated_ports.add(self._get_ofport_name(port['id'])) LOG.debug(_("port_update received port %s"), port['id']) def tunnel_update(self, context, **kwargs): @@ -609,7 +640,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback, self.provision_local_vlan(net_uuid, network_type, physical_network, segmentation_id) lvm = self.local_vlan_map[net_uuid] - lvm.vif_ports[port.vif_id] = port + lvm.vif_ports[port.port_name] = port # Do not bind a port if it's already bound cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag") if cur_tag != str(lvm.vlan): @@ -920,7 +951,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback, br, physical_network, bridge, ip_wrapper) def scan_ports(self, registered_ports, updated_ports=None): - cur_ports = self.int_br.get_vif_port_set() + cur_ports = self._get_ofport_names(self.int_br) self.int_br_device_count = len(cur_ports) port_info = {'current': cur_ports} if updated_ports is None: @@ -950,25 +981,32 @@ class OFANeutronAgent(rpc_compat.RpcCallback, The returned value is a set of port ids of the ports concerned by a vlan tag loss. """ + # TODO(yamamoto): stop using ovsdb + # an idea is to use metadata instead of tagged vlans. + # cf. blueprint ofagent-merge-bridges port_tags = self.int_br.get_port_tag_dict() changed_ports = set() for lvm in self.local_vlan_map.values(): for port in registered_ports: if ( port in lvm.vif_ports - and lvm.vif_ports[port].port_name in port_tags - and port_tags[lvm.vif_ports[port].port_name] != lvm.vlan + and port in port_tags + and port_tags[port] != lvm.vlan ): LOG.info( _("Port '%(port_name)s' has lost " "its vlan tag '%(vlan_tag)d'!"), - {'port_name': lvm.vif_ports[port].port_name, + {'port_name': port, 'vlan_tag': lvm.vlan} ) changed_ports.add(port) return changed_ports def update_ancillary_ports(self, registered_ports): + # TODO(yamamoto): stop using ovsdb + # - do the same as scan_ports + # - or, find a way to update status of ancillary ports differently + # eg. let interface drivers mark ports up ports = set() for bridge in self.ancillary_brs: ports |= bridge.get_vif_port_set() @@ -990,7 +1028,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback, # error condition of which operators should be aware if not vif_port.ofport: LOG.warn(_("VIF port: %s has no ofport configured, and might " - "not be able to transmit"), vif_port.vif_id) + "not be able to transmit"), vif_port.port_name) if admin_state_up: self.port_bound(vif_port, network_id, network_type, physical_network, segmentation_id) @@ -1060,16 +1098,17 @@ class OFANeutronAgent(rpc_compat.RpcCallback, def treat_devices_added_or_updated(self, devices): resync = False + all_ports = dict((p.port_name, p) for p in self._get_ports()) for device in devices: LOG.debug(_("Processing port %s"), device) - port = self.int_br.get_vif_port_by_id(device) - if not port: + if device not in all_ports: # The port has disappeared and should not be processed # There is no need to put the port DOWN in the plugin as # it never went up in the first place LOG.info(_("Port %s was not found on the integration bridge " "and will therefore not be processed"), device) continue + port = all_ports[device] try: details = self.plugin_rpc.get_device_details(self.context, device, @@ -1378,6 +1417,9 @@ class OFANeutronAgent(rpc_compat.RpcCallback, self.iter_num = self.iter_num + 1 def daemon_loop(self): + # TODO(yamamoto): make polling logic stop using ovsdb monitor + # - make it a dumb periodic polling + # - or, monitor port status async messages with polling.get_polling_manager( self.minimize_polling, self.root_helper, diff --git a/neutron/plugins/ofagent/agent/ports.py b/neutron/plugins/ofagent/agent/ports.py new file mode 100644 index 000000000..c78c5cd6c --- /dev/null +++ b/neutron/plugins/ofagent/agent/ports.py @@ -0,0 +1,27 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K. + + +class Port(object): + def __init__(self, port_name, ofport): + self.port_name = port_name + self.ofport = ofport + + @classmethod + def from_ofp_port(cls, ofp_port): + """Convert from ryu OFPPort.""" + return cls(port_name=ofp_port.name, ofport=ofp_port.port_no) diff --git a/neutron/tests/unit/ofagent/fake_oflib.py b/neutron/tests/unit/ofagent/fake_oflib.py index 68d21cb61..28a479cb5 100644 --- a/neutron/tests/unit/ofagent/fake_oflib.py +++ b/neutron/tests/unit/ofagent/fake_oflib.py @@ -62,7 +62,7 @@ def _mkcls(name): self._hist = [] def __getattr__(self, name): - return name + return self._kwargs[name] def __repr__(self): args = map(repr, self._args) @@ -74,6 +74,8 @@ def _mkcls(name): class _Mod(object): + _cls_cache = {} + def __init__(self, name): self._name = name @@ -81,7 +83,13 @@ class _Mod(object): fullname = '%s.%s' % (self._name, name) if '_' in name: # constants are named like OFPxxx_yyy_zzz return _SimpleValue(fullname) - return _mkcls(fullname) + try: + return self._cls_cache[fullname] + except KeyError: + pass + cls = _mkcls(fullname) + self._cls_cache[fullname] = cls + return cls def __repr__(self): return 'Mod(%s)' % (self._name,) diff --git a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py index 6df549a2d..3413c22b1 100644 --- a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py +++ b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py @@ -26,7 +26,6 @@ from oslo.config import cfg import testtools from neutron.agent.linux import ip_lib -from neutron.agent.linux import ovs_lib from neutron.agent.linux import utils from neutron.openstack.common import importutils from neutron.plugins.common import constants as p_const @@ -333,12 +332,12 @@ class TestOFANeutronAgent(OFAAgentTestCase): def test_port_dead_with_port_already_dead(self): self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG) - def mock_scan_ports(self, vif_port_set=None, registered_ports=None, + def mock_scan_ports(self, port_set=None, registered_ports=None, updated_ports=None, port_tags_dict=None): port_tags_dict = port_tags_dict or {} with contextlib.nested( - mock.patch.object(self.agent.int_br, 'get_vif_port_set', - return_value=vif_port_set), + mock.patch.object(self.agent, '_get_ofport_names', + return_value=port_set), mock.patch.object(self.agent.int_br, 'get_port_tag_dict', return_value=port_tags_dict) ): @@ -395,31 +394,33 @@ class TestOFANeutronAgent(OFAAgentTestCase): self.assertEqual(expected, actual) def test_update_ports_returns_lost_vlan_port(self): - br = self.mod_agent.OVSBridge('br-int', 'fake_helper', self.ryuapp) - mac = "ca:fe:de:ad:be:ef" - port = ovs_lib.VifPort(1, 1, 1, mac, br) + port = mock.Mock(port_name='tap00000001-00', ofport=1) lvm = self.mod_agent.LocalVLANMapping( - 1, '1', None, 1, {port.vif_id: port}) + vlan=1, network_type='1', physical_network=None, segmentation_id=1, + vif_ports={port.port_name: port}) local_vlan_map = {'1': lvm} - vif_port_set = set([1, 3]) - registered_ports = set([1, 2]) - port_tags_dict = {1: []} + port_set = set(['tap00000001-00', + 'tap00000003-00']) + registered_ports = set(['tap00000001-00', 'tap00000002-00']) + port_tags_dict = {'tap00000001-00': []} expected = dict( - added=set([3]), current=vif_port_set, - removed=set([2]), updated=set([1]) + added=set(['tap00000003-00']), + current=set(['tap00000001-00', 'tap00000003-00']), + removed=set(['tap00000002-00']), + updated=set(['tap00000001-00']) ) with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map): actual = self.mock_scan_ports( - vif_port_set, registered_ports, port_tags_dict=port_tags_dict) + port_set, registered_ports, port_tags_dict=port_tags_dict) self.assertEqual(expected, actual) def test_treat_devices_added_returns_true_for_missing_device(self): with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details', side_effect=Exception()), - mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', - return_value=mock.Mock())): - self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) + mock.patch.object(self.agent, '_get_ports', + return_value=[mock.Mock(port_name='xxx')])): + self.assertTrue(self.agent.treat_devices_added_or_updated(['xxx'])) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. @@ -432,13 +433,14 @@ class TestOFANeutronAgent(OFAAgentTestCase): with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details', return_value=details), - mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', - return_value=port), + mock.patch.object(self.agent, '_get_ports', + return_value=[port]), mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + self.assertFalse(self.agent.treat_devices_added_or_updated( + [port.port_name])) return func.called def test_treat_devices_added_updated_ignores_invalid_ofport(self): @@ -478,14 +480,15 @@ class TestOFANeutronAgent(OFAAgentTestCase): with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details', return_value=fake_details_dict), - mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', - return_value=mock.MagicMock()), + mock.patch.object(self.agent, '_get_ports', + return_value=[mock.Mock(port_name='xxx')]), mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, 'treat_vif_port') ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, treat_vif_port): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + self.assertFalse(self.agent.treat_devices_added_or_updated( + ['xxx'])) self.assertTrue(treat_vif_port.called) self.assertTrue(upd_dev_down.called) @@ -561,7 +564,7 @@ class TestOFANeutronAgent(OFAAgentTestCase): recl_fn.assert_called_with("123") def test_port_update(self): - port = {"id": "123", + port = {"id": "b1981919-f516-11e3-a8f4-08606e7f74e7", "network_id": "124", "admin_state_up": False} self.agent.port_update("unused_context", @@ -569,7 +572,7 @@ class TestOFANeutronAgent(OFAAgentTestCase): network_type="vlan", segmentation_id="1", physical_network="physnet") - self.assertEqual(set(['123']), self.agent.updated_ports) + self.assertEqual(set(['tapb1981919-f5']), self.agent.updated_ports) def test_setup_physical_bridges(self): with contextlib.nested( @@ -966,6 +969,32 @@ class TestOFANeutronAgent(OFAAgentTestCase): table_id=ofp.OFPTT_ALL) sendmsg.assert_has_calls([mock.call(expected_msg)]) + def test__get_ports(self): + ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') + reply = [ofpp.OFPPortDescStatsReply(body=[ofpp.OFPPort(name='hoge', + port_no=8)])] + sendmsg = mock.Mock(return_value=reply) + self.mod_agent.ryu_api.send_msg = sendmsg + result = self.agent._get_ports(self.agent.int_br) + result = list(result) # convert generator to list. + self.assertEqual(1, len(result)) + self.assertEqual('hoge', result[0].port_name) + self.assertEqual(8, result[0].ofport) + expected_msg = ofpp.OFPPortDescStatsRequest( + datapath=self.agent.int_br.datapath) + sendmsg.assert_has_calls([mock.call(app=self.agent.ryuapp, + msg=expected_msg, reply_cls=ofpp.OFPPortDescStatsReply, + reply_multi=True)]) + + def test__get_ofport_names(self): + names = ['p111', 'p222', 'p333'] + ps = [mock.Mock(port_name=x, ofport=names.index(x)) for x in names] + with mock.patch.object(self.agent, '_get_ports', + return_value=ps) as _get_ports: + result = self.agent._get_ofport_names('hoge') + _get_ports.assert_called_once_with('hoge') + self.assertEqual(set(names), result) + class AncillaryBridgesTest(OFAAgentTestCase): diff --git a/neutron/tests/unit/ofagent/test_ofa_ports.py b/neutron/tests/unit/ofagent/test_ofa_ports.py new file mode 100644 index 000000000..7cd2bc60f --- /dev/null +++ b/neutron/tests/unit/ofagent/test_ofa_ports.py @@ -0,0 +1,32 @@ +# Copyright (C) 2014 VA Linux Systems Japan K.K. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K. + + +import mock + +from neutron.plugins.ofagent.agent import ports +from neutron.tests import base + + +class TestOFAgentPorts(base.BaseTestCase): + def test_port(self): + p1 = ports.Port(port_name='foo', ofport=999) + ryu_ofp_port = mock.Mock(port_no=999) + ryu_ofp_port.name = 'foo' + p2 = ports.Port.from_ofp_port(ofp_port=ryu_ofp_port) + self.assertEqual(p1.port_name, p2.port_name) + self.assertEqual(p1.ofport, p2.ofport) -- 2.45.2