]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
ofagent: Stop monitoring ovsdb for port changes
authorYAMAMOTO Takashi <yamamoto@valinux.co.jp>
Thu, 31 Jul 2014 06:31:16 +0000 (15:31 +0900)
committerYAMAMOTO Takashi <yamamoto@valinux.co.jp>
Mon, 4 Aug 2014 17:31:35 +0000 (02:31 +0900)
Perform dumb polling instead.
Now ofagent port monitoring is mostly ovsdb-free.
(except _find_lost_vlan_port stuff, which is planned to be retired
by blueprint ofagent-merge-bridges)

Partially-Implements: blueprint ofagent-port-monitor
Change-Id: Ib4701f7c8ea0ee03229d207c0fbbf6f42a55aecb

neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/tests/unit/ofagent/test_ofa_neutron_agent.py

index 33d849b685adfb162c010ed0446ecfbc59b0987e..95f8dc294df9210325243c4acc352cb504e07976 100644 (file)
@@ -33,7 +33,6 @@ from ryu.ofproto import ofproto_v1_3 as ryu_ofp13
 from neutron.agent import l2population_rpc
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ovs_lib
-from neutron.agent.linux import polling
 from neutron.agent.linux import utils
 from neutron.agent import rpc as agent_rpc
 from neutron.agent import securitygroups_rpc as sg_rpc
@@ -200,9 +199,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
     def __init__(self, ryuapp, integ_br, tun_br, local_ip,
                  bridge_mappings, root_helper,
                  polling_interval, tunnel_types=None,
-                 veth_mtu=None, minimize_polling=False,
-                 ovsdb_monitor_respawn_interval=(
-                     constants.DEFAULT_OVSDBMON_RESPAWN)):
+                 veth_mtu=None):
         """Constructor.
 
         :param ryuapp: object of the ryu app.
@@ -216,11 +213,6 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                the agent. If set, will automatically set enable_tunneling to
                True.
         :param veth_mtu: MTU size for veth interfaces.
-        :param minimize_polling: Optional, whether to minimize polling by
-               monitoring ovsdb for interface changes.
-        :param ovsdb_monitor_respawn_interval: Optional, when using polling
-               minimization, the number of seconds to wait before respawning
-               the ovsdb monitor.
         """
         super(OFANeutronAgent, self).__init__()
         self.ryuapp = ryuapp
@@ -254,8 +246,6 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                                p_const.TYPE_VXLAN: {}}
 
         self.polling_interval = polling_interval
-        self.minimize_polling = minimize_polling
-        self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
 
         self.enable_tunneling = bool(self.tunnel_types)
         self.local_ip = local_ip
@@ -1260,35 +1250,27 @@ class OFANeutronAgent(n_rpc.RpcCallback,
             resync = True
         return resync
 
-    def _agent_has_updates(self, polling_manager):
-        return (polling_manager.is_polling_required or
-                self.updated_ports or
-                self.sg_agent.firewall_refresh_needed())
-
     def _port_info_has_changes(self, port_info):
         return (port_info.get('added') or
                 port_info.get('removed') or
                 port_info.get('updated'))
 
-    def ovsdb_monitor_loop(self, polling_manager=None):
-        if not polling_manager:
-            polling_manager = polling.AlwaysPoll()
+    def daemon_loop(self):
+        # TODO(yamamoto):
+        # It might be better to monitor port status async messages
 
         sync = True
         ports = set()
-        updated_ports_copy = set()
         tunnel_sync = True
         while True:
             start = time.time()
             port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}}
-            LOG.debug(_("Agent ovsdb_monitor_loop - "
-                      "iteration:%d started"),
+            LOG.debug("Agent daemon_loop - iteration:%d started",
                       self.iter_num)
             if sync:
                 LOG.info(_("Agent out of sync with plugin!"))
                 ports.clear()
                 sync = False
-                polling_manager.force_polling()
             # Notify the plugin of tunnel IP
             if self.enable_tunneling and tunnel_sync:
                 LOG.info(_("Agent tunnel out of sync with plugin!"))
@@ -1297,82 +1279,66 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                 except Exception:
                     LOG.exception(_("Error while synchronizing tunnels"))
                     tunnel_sync = True
-            if self._agent_has_updates(polling_manager):
-                try:
-                    LOG.debug(_("Agent ovsdb_monitor_loop - "
-                                "iteration:%(iter_num)d - "
-                                "starting polling. Elapsed:%(elapsed).3f"),
-                              {'iter_num': self.iter_num,
-                               'elapsed': time.time() - start})
-                    # Save updated ports dict to perform rollback in
-                    # case resync would be needed, and then clear
-                    # self.updated_ports. As the greenthread should not yield
-                    # between these two statements, this will be thread-safe
-                    updated_ports_copy = self.updated_ports
-                    self.updated_ports = set()
-                    port_info = self.scan_ports(ports, updated_ports_copy)
-                    ports = port_info['current']
-                    LOG.debug(_("Agent ovsdb_monitor_loop - "
-                                "iteration:%(iter_num)d - "
-                                "port information retrieved. "
-                                "Elapsed:%(elapsed).3f"),
+            LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
+                      "starting polling. Elapsed:%(elapsed).3f",
+                      {'iter_num': self.iter_num,
+                       'elapsed': time.time() - start})
+            try:
+                # Save updated ports dict to perform rollback in
+                # case resync would be needed, and then clear
+                # self.updated_ports. As the greenthread should not yield
+                # between these two statements, this will be thread-safe
+                updated_ports_copy = self.updated_ports
+                self.updated_ports = set()
+                port_info = self.scan_ports(ports, updated_ports_copy)
+                ports = port_info['current']
+                LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
+                          "port information retrieved. "
+                          "Elapsed:%(elapsed).3f",
+                          {'iter_num': self.iter_num,
+                           'elapsed': time.time() - start})
+                # Secure and wire/unwire VIFs and update their status
+                # on Neutron server
+                if (self._port_info_has_changes(port_info) or
+                    self.sg_agent.firewall_refresh_needed()):
+                    LOG.debug("Starting to process devices in:%s",
+                              port_info)
+                    # If treat devices fails - must resync with plugin
+                    sync = self.process_network_ports(port_info)
+                    LOG.debug("Agent daemon_loop - "
+                              "iteration:%(iter_num)d - "
+                              "ports processed. Elapsed:%(elapsed).3f",
                               {'iter_num': self.iter_num,
                                'elapsed': time.time() - start})
-                    # Secure and wire/unwire VIFs and update their status
-                    # on Neutron server
-                    if (self._port_info_has_changes(port_info) or
-                        self.sg_agent.firewall_refresh_needed()):
-                        LOG.debug(_("Starting to process devices in:%s"),
-                                  port_info)
-                        # If treat devices fails - must resync with plugin
-                        sync = self.process_network_ports(port_info)
-                        LOG.debug(_("Agent ovsdb_monitor_loop - "
-                                    "iteration:%(iter_num)d - "
-                                    "ports processed. Elapsed:%(elapsed).3f"),
-                                  {'iter_num': self.iter_num,
-                                   'elapsed': time.time() - start})
-                        port_stats['regular']['added'] = (
-                            len(port_info.get('added', [])))
-                        port_stats['regular']['updated'] = (
-                            len(port_info.get('updated', [])))
-                        port_stats['regular']['removed'] = (
-                            len(port_info.get('removed', [])))
-
-                    polling_manager.polling_completed()
-                except Exception:
-                    LOG.exception(_("Error while processing VIF ports"))
-                    # Put the ports back in self.updated_port
-                    self.updated_ports |= updated_ports_copy
-                    sync = True
+                    port_stats['regular']['added'] = (
+                        len(port_info.get('added', [])))
+                    port_stats['regular']['updated'] = (
+                        len(port_info.get('updated', [])))
+                    port_stats['regular']['removed'] = (
+                        len(port_info.get('removed', [])))
+            except Exception:
+                LOG.exception(_("Error while processing VIF ports"))
+                # Put the ports back in self.updated_port
+                self.updated_ports |= updated_ports_copy
+                sync = True
 
             # sleep till end of polling interval
             elapsed = (time.time() - start)
-            LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "
-                        "completed. Processed ports statistics:"
-                        "%(port_stats)s. Elapsed:%(elapsed).3f"),
+            LOG.debug("Agent daemon_loop - iteration:%(iter_num)d "
+                      "completed. Processed ports statistics:"
+                      "%(port_stats)s. Elapsed:%(elapsed).3f",
                       {'iter_num': self.iter_num,
                        'port_stats': port_stats,
                        'elapsed': elapsed})
             if (elapsed < self.polling_interval):
                 time.sleep(self.polling_interval - elapsed)
             else:
-                LOG.debug(_("Loop iteration exceeded interval "
-                            "(%(polling_interval)s vs. %(elapsed)s)!"),
+                LOG.debug("Loop iteration exceeded interval "
+                          "(%(polling_interval)s vs. %(elapsed)s)!",
                           {'polling_interval': self.polling_interval,
                            'elapsed': elapsed})
             self.iter_num = self.iter_num + 1
 
-    def daemon_loop(self):
-        # TODO(yamamoto): make polling logic stop using ovsdb monitor
-        # - make it a dumb periodic polling
-        # - or, monitor port status async messages
-        with polling.get_polling_manager(
-                self.minimize_polling,
-                self.root_helper,
-                self.ovsdb_monitor_respawn_interval) as pm:
-
-            self.ovsdb_monitor_loop(polling_manager=pm)
-
 
 def create_agent_config_map(config):
     """Create a map of agent config parameters.
@@ -1392,10 +1358,8 @@ def create_agent_config_map(config):
         bridge_mappings=bridge_mappings,
         root_helper=config.AGENT.root_helper,
         polling_interval=config.AGENT.polling_interval,
-        minimize_polling=config.AGENT.minimize_polling,
         tunnel_types=config.AGENT.tunnel_types,
         veth_mtu=config.AGENT.veth_mtu,
-        ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
     )
 
     # If enable_tunneling is TRUE, set tunnel_type to default to GRE
index 1180b1de66e43506780ec4e187ddf38f46b2470b..d79a266123355a321cc14f2825c77ac248fb53c4 100644 (file)
@@ -808,22 +808,6 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
             self.agent.reclaim_local_vlan(self.lvms[1].net)
             del_port_fn.assert_called_once_with(self.tun_name2)
 
-    def test_daemon_loop_uses_polling_manager(self):
-        with mock.patch(
-            'neutron.agent.linux.polling.get_polling_manager'
-        ) as mock_get_pm:
-            fake_pm = mock.Mock()
-            mock_get_pm.return_value = fake_pm
-            fake_pm.__enter__ = mock.Mock()
-            fake_pm.__exit__ = mock.Mock()
-            with mock.patch.object(
-                self.agent, 'ovsdb_monitor_loop'
-            ) as mock_loop:
-                self.agent.daemon_loop()
-        mock_get_pm.assert_called_once_with(True, 'fake_helper',
-                                            constants.DEFAULT_OVSDBMON_RESPAWN)
-        mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__())
-
     def test__setup_tunnel_port_error_negative(self):
         with contextlib.nested(
             mock.patch.object(self.agent.tun_br, 'add_tunnel_port',