]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
ofagent: Use port desc to monitor ports on br-int
authorYAMAMOTO Takashi <yamamoto@valinux.co.jp>
Tue, 3 Jun 2014 06:53:22 +0000 (15:53 +0900)
committerYAMAMOTO Takashi <yamamoto@valinux.co.jp>
Thu, 26 Jun 2014 03:29:01 +0000 (12:29 +0900)
also, use ofport.name as device identifiers.
sprinkle some TODO comments.

Implements: blueprint ofagent-port-monitor
Change-Id: I4995964f37729f954fec71c4a2e61e463a430b1a

neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/plugins/ofagent/agent/ports.py [new file with mode: 0644]
neutron/tests/unit/ofagent/fake_oflib.py
neutron/tests/unit/ofagent/test_ofa_neutron_agent.py
neutron/tests/unit/ofagent/test_ofa_ports.py [new file with mode: 0644]

index 7ff3040b067e7ea6f0caccf26388af4c49e8b4aa..5cc77ec4ecc11771ce7f16439397a10575fcb449 100644 (file)
@@ -41,6 +41,7 @@ from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
 from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.common import constants as p_const
+from neutron.plugins.ofagent.agent import ports
 from neutron.plugins.ofagent.common import config  # noqa
 from neutron.plugins.openvswitch.common import constants
 
@@ -302,6 +303,36 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
                 self._report_state)
             heartbeat.start(interval=report_interval)
 
+    @staticmethod
+    def _get_ofport_name(interface_id):
+        """Convert from neutron device id (uuid) to OpenFlow port name.
+
+        This needs to be synced with ML2 plugin's _device_to_port_id().
+
+        An assumption: The switch uses an OS's interface name as the
+        corresponding OpenFlow port name.
+        NOTE(yamamoto): While it's true for Open vSwitch, it isn't
+        necessarily true everywhere.  For example, LINC uses something
+        like "LogicalSwitch0-Port2".
+        """
+        return "tap" + interface_id[0:11]
+
+    def _get_ports(self, br):
+        """Generate ports.Port instances for the given bridge."""
+        datapath = br.datapath
+        ofpp = datapath.ofproto_parser
+        msg = ofpp.OFPPortDescStatsRequest(datapath=datapath)
+        descs = ryu_api.send_msg(app=self.ryuapp, msg=msg,
+                                 reply_cls=ofpp.OFPPortDescStatsReply,
+                                 reply_multi=True)
+        for d in descs:
+            for p in d.body:
+                yield ports.Port.from_ofp_port(p)
+
+    def _get_ofport_names(self, br):
+        """Return a set of OpenFlow port names for the given bridge."""
+        return set(p.port_name for p in self._get_ports(br))
+
     def get_net_uuid(self, vif_id):
         for network_id, vlan_mapping in self.local_vlan_map.iteritems():
             if vif_id in vlan_mapping.vif_ports:
@@ -323,7 +354,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
         # Even if full port details might be provided to this call,
         # they are not used since there is no guarantee the notifications
         # are processed in the same order as the relevant API requests
-        self.updated_ports.add(port['id'])
+        self.updated_ports.add(self._get_ofport_name(port['id']))
         LOG.debug(_("port_update received port %s"), port['id'])
 
     def tunnel_update(self, context, **kwargs):
@@ -609,7 +640,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
             self.provision_local_vlan(net_uuid, network_type,
                                       physical_network, segmentation_id)
         lvm = self.local_vlan_map[net_uuid]
-        lvm.vif_ports[port.vif_id] = port
+        lvm.vif_ports[port.port_name] = port
         # Do not bind a port if it's already bound
         cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
         if cur_tag != str(lvm.vlan):
@@ -920,7 +951,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
                 br, physical_network, bridge, ip_wrapper)
 
     def scan_ports(self, registered_ports, updated_ports=None):
-        cur_ports = self.int_br.get_vif_port_set()
+        cur_ports = self._get_ofport_names(self.int_br)
         self.int_br_device_count = len(cur_ports)
         port_info = {'current': cur_ports}
         if updated_ports is None:
@@ -950,25 +981,32 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
         The returned value is a set of port ids of the ports concerned by a
         vlan tag loss.
         """
+        # TODO(yamamoto): stop using ovsdb
+        # an idea is to use metadata instead of tagged vlans.
+        # cf. blueprint ofagent-merge-bridges
         port_tags = self.int_br.get_port_tag_dict()
         changed_ports = set()
         for lvm in self.local_vlan_map.values():
             for port in registered_ports:
                 if (
                     port in lvm.vif_ports
-                    and lvm.vif_ports[port].port_name in port_tags
-                    and port_tags[lvm.vif_ports[port].port_name] != lvm.vlan
+                    and port in port_tags
+                    and port_tags[port] != lvm.vlan
                 ):
                     LOG.info(
                         _("Port '%(port_name)s' has lost "
                             "its vlan tag '%(vlan_tag)d'!"),
-                        {'port_name': lvm.vif_ports[port].port_name,
+                        {'port_name': port,
                          'vlan_tag': lvm.vlan}
                     )
                     changed_ports.add(port)
         return changed_ports
 
     def update_ancillary_ports(self, registered_ports):
+        # TODO(yamamoto): stop using ovsdb
+        # - do the same as scan_ports
+        # - or, find a way to update status of ancillary ports differently
+        #   eg. let interface drivers mark ports up
         ports = set()
         for bridge in self.ancillary_brs:
             ports |= bridge.get_vif_port_set()
@@ -990,7 +1028,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
             # error condition of which operators should be aware
             if not vif_port.ofport:
                 LOG.warn(_("VIF port: %s has no ofport configured, and might "
-                           "not be able to transmit"), vif_port.vif_id)
+                           "not be able to transmit"), vif_port.port_name)
             if admin_state_up:
                 self.port_bound(vif_port, network_id, network_type,
                                 physical_network, segmentation_id)
@@ -1060,16 +1098,17 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
 
     def treat_devices_added_or_updated(self, devices):
         resync = False
+        all_ports = dict((p.port_name, p) for p in self._get_ports())
         for device in devices:
             LOG.debug(_("Processing port %s"), device)
-            port = self.int_br.get_vif_port_by_id(device)
-            if not port:
+            if device not in all_ports:
                 # The port has disappeared and should not be processed
                 # There is no need to put the port DOWN in the plugin as
                 # it never went up in the first place
                 LOG.info(_("Port %s was not found on the integration bridge "
                            "and will therefore not be processed"), device)
                 continue
+            port = all_ports[device]
             try:
                 details = self.plugin_rpc.get_device_details(self.context,
                                                              device,
@@ -1378,6 +1417,9 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
             self.iter_num = self.iter_num + 1
 
     def daemon_loop(self):
+        # TODO(yamamoto): make polling logic stop using ovsdb monitor
+        # - make it a dumb periodic polling
+        # - or, monitor port status async messages
         with polling.get_polling_manager(
                 self.minimize_polling,
                 self.root_helper,
diff --git a/neutron/plugins/ofagent/agent/ports.py b/neutron/plugins/ofagent/agent/ports.py
new file mode 100644 (file)
index 0000000..c78c5cd
--- /dev/null
@@ -0,0 +1,27 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
+
+
+class Port(object):
+    def __init__(self, port_name, ofport):
+        self.port_name = port_name
+        self.ofport = ofport
+
+    @classmethod
+    def from_ofp_port(cls, ofp_port):
+        """Convert from ryu OFPPort."""
+        return cls(port_name=ofp_port.name, ofport=ofp_port.port_no)
index 68d21cb6142a2669b7de105a7833159cccb7239c..28a479cb5fcf115a3f32dedf7e7bc0b24b36eae0 100644 (file)
@@ -62,7 +62,7 @@ def _mkcls(name):
             self._hist = []
 
         def __getattr__(self, name):
-            return name
+            return self._kwargs[name]
 
         def __repr__(self):
             args = map(repr, self._args)
@@ -74,6 +74,8 @@ def _mkcls(name):
 
 
 class _Mod(object):
+    _cls_cache = {}
+
     def __init__(self, name):
         self._name = name
 
@@ -81,7 +83,13 @@ class _Mod(object):
         fullname = '%s.%s' % (self._name, name)
         if '_' in name:  # constants are named like OFPxxx_yyy_zzz
             return _SimpleValue(fullname)
-        return _mkcls(fullname)
+        try:
+            return self._cls_cache[fullname]
+        except KeyError:
+            pass
+        cls = _mkcls(fullname)
+        self._cls_cache[fullname] = cls
+        return cls
 
     def __repr__(self):
         return 'Mod(%s)' % (self._name,)
index 6df549a2d7bac1fa3743f9b6ecff3b74b773f928..3413c22b10a0159ac4b7ed04c56cb96fe9f3ecb6 100644 (file)
@@ -26,7 +26,6 @@ from oslo.config import cfg
 import testtools
 
 from neutron.agent.linux import ip_lib
-from neutron.agent.linux import ovs_lib
 from neutron.agent.linux import utils
 from neutron.openstack.common import importutils
 from neutron.plugins.common import constants as p_const
@@ -333,12 +332,12 @@ class TestOFANeutronAgent(OFAAgentTestCase):
     def test_port_dead_with_port_already_dead(self):
         self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
 
-    def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
+    def mock_scan_ports(self, port_set=None, registered_ports=None,
                         updated_ports=None, port_tags_dict=None):
         port_tags_dict = port_tags_dict or {}
         with contextlib.nested(
-            mock.patch.object(self.agent.int_br, 'get_vif_port_set',
-                              return_value=vif_port_set),
+            mock.patch.object(self.agent, '_get_ofport_names',
+                              return_value=port_set),
             mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
                               return_value=port_tags_dict)
         ):
@@ -395,31 +394,33 @@ class TestOFANeutronAgent(OFAAgentTestCase):
         self.assertEqual(expected, actual)
 
     def test_update_ports_returns_lost_vlan_port(self):
-        br = self.mod_agent.OVSBridge('br-int', 'fake_helper', self.ryuapp)
-        mac = "ca:fe:de:ad:be:ef"
-        port = ovs_lib.VifPort(1, 1, 1, mac, br)
+        port = mock.Mock(port_name='tap00000001-00', ofport=1)
         lvm = self.mod_agent.LocalVLANMapping(
-            1, '1', None, 1, {port.vif_id: port})
+            vlan=1, network_type='1', physical_network=None, segmentation_id=1,
+            vif_ports={port.port_name: port})
         local_vlan_map = {'1': lvm}
-        vif_port_set = set([1, 3])
-        registered_ports = set([1, 2])
-        port_tags_dict = {1: []}
+        port_set = set(['tap00000001-00',
+                        'tap00000003-00'])
+        registered_ports = set(['tap00000001-00', 'tap00000002-00'])
+        port_tags_dict = {'tap00000001-00': []}
         expected = dict(
-            added=set([3]), current=vif_port_set,
-            removed=set([2]), updated=set([1])
+            added=set(['tap00000003-00']),
+            current=set(['tap00000001-00', 'tap00000003-00']),
+            removed=set(['tap00000002-00']),
+            updated=set(['tap00000001-00'])
         )
         with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map):
             actual = self.mock_scan_ports(
-                vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
+                port_set, registered_ports, port_tags_dict=port_tags_dict)
         self.assertEqual(expected, actual)
 
     def test_treat_devices_added_returns_true_for_missing_device(self):
         with contextlib.nested(
             mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
                               side_effect=Exception()),
-            mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
-                              return_value=mock.Mock())):
-            self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
+            mock.patch.object(self.agent, '_get_ports',
+                              return_value=[mock.Mock(port_name='xxx')])):
+            self.assertTrue(self.agent.treat_devices_added_or_updated(['xxx']))
 
     def _mock_treat_devices_added_updated(self, details, port, func_name):
         """Mock treat devices added or updated.
@@ -432,13 +433,14 @@ class TestOFANeutronAgent(OFAAgentTestCase):
         with contextlib.nested(
             mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
                               return_value=details),
-            mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
-                              return_value=port),
+            mock.patch.object(self.agent, '_get_ports',
+                              return_value=[port]),
             mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
             mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
             mock.patch.object(self.agent, func_name)
         ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
-            self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+            self.assertFalse(self.agent.treat_devices_added_or_updated(
+                [port.port_name]))
         return func.called
 
     def test_treat_devices_added_updated_ignores_invalid_ofport(self):
@@ -478,14 +480,15 @@ class TestOFANeutronAgent(OFAAgentTestCase):
         with contextlib.nested(
             mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
                               return_value=fake_details_dict),
-            mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
-                              return_value=mock.MagicMock()),
+            mock.patch.object(self.agent, '_get_ports',
+                              return_value=[mock.Mock(port_name='xxx')]),
             mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
             mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
             mock.patch.object(self.agent, 'treat_vif_port')
         ) as (get_dev_fn, get_vif_func, upd_dev_up,
               upd_dev_down, treat_vif_port):
-            self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+            self.assertFalse(self.agent.treat_devices_added_or_updated(
+                ['xxx']))
             self.assertTrue(treat_vif_port.called)
             self.assertTrue(upd_dev_down.called)
 
@@ -561,7 +564,7 @@ class TestOFANeutronAgent(OFAAgentTestCase):
             recl_fn.assert_called_with("123")
 
     def test_port_update(self):
-        port = {"id": "123",
+        port = {"id": "b1981919-f516-11e3-a8f4-08606e7f74e7",
                 "network_id": "124",
                 "admin_state_up": False}
         self.agent.port_update("unused_context",
@@ -569,7 +572,7 @@ class TestOFANeutronAgent(OFAAgentTestCase):
                                network_type="vlan",
                                segmentation_id="1",
                                physical_network="physnet")
-        self.assertEqual(set(['123']), self.agent.updated_ports)
+        self.assertEqual(set(['tapb1981919-f5']), self.agent.updated_ports)
 
     def test_setup_physical_bridges(self):
         with contextlib.nested(
@@ -966,6 +969,32 @@ class TestOFANeutronAgent(OFAAgentTestCase):
             table_id=ofp.OFPTT_ALL)
         sendmsg.assert_has_calls([mock.call(expected_msg)])
 
+    def test__get_ports(self):
+        ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
+        reply = [ofpp.OFPPortDescStatsReply(body=[ofpp.OFPPort(name='hoge',
+                                                               port_no=8)])]
+        sendmsg = mock.Mock(return_value=reply)
+        self.mod_agent.ryu_api.send_msg = sendmsg
+        result = self.agent._get_ports(self.agent.int_br)
+        result = list(result)  # convert generator to list.
+        self.assertEqual(1, len(result))
+        self.assertEqual('hoge', result[0].port_name)
+        self.assertEqual(8, result[0].ofport)
+        expected_msg = ofpp.OFPPortDescStatsRequest(
+            datapath=self.agent.int_br.datapath)
+        sendmsg.assert_has_calls([mock.call(app=self.agent.ryuapp,
+            msg=expected_msg, reply_cls=ofpp.OFPPortDescStatsReply,
+            reply_multi=True)])
+
+    def test__get_ofport_names(self):
+        names = ['p111', 'p222', 'p333']
+        ps = [mock.Mock(port_name=x, ofport=names.index(x)) for x in names]
+        with mock.patch.object(self.agent, '_get_ports',
+                               return_value=ps) as _get_ports:
+            result = self.agent._get_ofport_names('hoge')
+        _get_ports.assert_called_once_with('hoge')
+        self.assertEqual(set(names), result)
+
 
 class AncillaryBridgesTest(OFAAgentTestCase):
 
diff --git a/neutron/tests/unit/ofagent/test_ofa_ports.py b/neutron/tests/unit/ofagent/test_ofa_ports.py
new file mode 100644 (file)
index 0000000..7cd2bc6
--- /dev/null
@@ -0,0 +1,32 @@
+# Copyright (C) 2014 VA Linux Systems Japan K.K.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
+
+
+import mock
+
+from neutron.plugins.ofagent.agent import ports
+from neutron.tests import base
+
+
+class TestOFAgentPorts(base.BaseTestCase):
+    def test_port(self):
+        p1 = ports.Port(port_name='foo', ofport=999)
+        ryu_ofp_port = mock.Mock(port_no=999)
+        ryu_ofp_port.name = 'foo'
+        p2 = ports.Port.from_ofp_port(ofp_port=ryu_ofp_port)
+        self.assertEqual(p1.port_name, p2.port_name)
+        self.assertEqual(p1.ofport, p2.ofport)