From f151caea9bc884716a19763386d86e4a1fb55242 Mon Sep 17 00:00:00 2001 From: YAMAMOTO Takashi Date: Thu, 31 Jul 2014 15:31:16 +0900 Subject: [PATCH] ofagent: Stop monitoring ovsdb for port changes Perform dumb polling instead. Now ofagent port monitoring is mostly ovsdb-free. (except _find_lost_vlan_port stuff, which is planned to be retired by blueprint ofagent-merge-bridges) Partially-Implements: blueprint ofagent-port-monitor Change-Id: Ib4701f7c8ea0ee03229d207c0fbbf6f42a55aecb --- .../ofagent/agent/ofa_neutron_agent.py | 136 +++++++----------- .../unit/ofagent/test_ofa_neutron_agent.py | 16 --- 2 files changed, 50 insertions(+), 102 deletions(-) diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index 33d849b68..95f8dc294 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -33,7 +33,6 @@ from ryu.ofproto import ofproto_v1_3 as ryu_ofp13 from neutron.agent import l2population_rpc from neutron.agent.linux import ip_lib from neutron.agent.linux import ovs_lib -from neutron.agent.linux import polling from neutron.agent.linux import utils from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc @@ -200,9 +199,7 @@ class OFANeutronAgent(n_rpc.RpcCallback, def __init__(self, ryuapp, integ_br, tun_br, local_ip, bridge_mappings, root_helper, polling_interval, tunnel_types=None, - veth_mtu=None, minimize_polling=False, - ovsdb_monitor_respawn_interval=( - constants.DEFAULT_OVSDBMON_RESPAWN)): + veth_mtu=None): """Constructor. :param ryuapp: object of the ryu app. @@ -216,11 +213,6 @@ class OFANeutronAgent(n_rpc.RpcCallback, the agent. If set, will automatically set enable_tunneling to True. :param veth_mtu: MTU size for veth interfaces. - :param minimize_polling: Optional, whether to minimize polling by - monitoring ovsdb for interface changes. - :param ovsdb_monitor_respawn_interval: Optional, when using polling - minimization, the number of seconds to wait before respawning - the ovsdb monitor. """ super(OFANeutronAgent, self).__init__() self.ryuapp = ryuapp @@ -254,8 +246,6 @@ class OFANeutronAgent(n_rpc.RpcCallback, p_const.TYPE_VXLAN: {}} self.polling_interval = polling_interval - self.minimize_polling = minimize_polling - self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval self.enable_tunneling = bool(self.tunnel_types) self.local_ip = local_ip @@ -1260,35 +1250,27 @@ class OFANeutronAgent(n_rpc.RpcCallback, resync = True return resync - def _agent_has_updates(self, polling_manager): - return (polling_manager.is_polling_required or - self.updated_ports or - self.sg_agent.firewall_refresh_needed()) - def _port_info_has_changes(self, port_info): return (port_info.get('added') or port_info.get('removed') or port_info.get('updated')) - def ovsdb_monitor_loop(self, polling_manager=None): - if not polling_manager: - polling_manager = polling.AlwaysPoll() + def daemon_loop(self): + # TODO(yamamoto): + # It might be better to monitor port status async messages sync = True ports = set() - updated_ports_copy = set() tunnel_sync = True while True: start = time.time() port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}} - LOG.debug(_("Agent ovsdb_monitor_loop - " - "iteration:%d started"), + LOG.debug("Agent daemon_loop - iteration:%d started", self.iter_num) if sync: LOG.info(_("Agent out of sync with plugin!")) ports.clear() sync = False - polling_manager.force_polling() # Notify the plugin of tunnel IP if self.enable_tunneling and tunnel_sync: LOG.info(_("Agent tunnel out of sync with plugin!")) @@ -1297,82 +1279,66 @@ class OFANeutronAgent(n_rpc.RpcCallback, except Exception: LOG.exception(_("Error while synchronizing tunnels")) tunnel_sync = True - if self._agent_has_updates(polling_manager): - try: - LOG.debug(_("Agent ovsdb_monitor_loop - " - "iteration:%(iter_num)d - " - "starting polling. Elapsed:%(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - # Save updated ports dict to perform rollback in - # case resync would be needed, and then clear - # self.updated_ports. As the greenthread should not yield - # between these two statements, this will be thread-safe - updated_ports_copy = self.updated_ports - self.updated_ports = set() - port_info = self.scan_ports(ports, updated_ports_copy) - ports = port_info['current'] - LOG.debug(_("Agent ovsdb_monitor_loop - " - "iteration:%(iter_num)d - " - "port information retrieved. " - "Elapsed:%(elapsed).3f"), + LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - " + "starting polling. Elapsed:%(elapsed).3f", + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + try: + # Save updated ports dict to perform rollback in + # case resync would be needed, and then clear + # self.updated_ports. As the greenthread should not yield + # between these two statements, this will be thread-safe + updated_ports_copy = self.updated_ports + self.updated_ports = set() + port_info = self.scan_ports(ports, updated_ports_copy) + ports = port_info['current'] + LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - " + "port information retrieved. " + "Elapsed:%(elapsed).3f", + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + # Secure and wire/unwire VIFs and update their status + # on Neutron server + if (self._port_info_has_changes(port_info) or + self.sg_agent.firewall_refresh_needed()): + LOG.debug("Starting to process devices in:%s", + port_info) + # If treat devices fails - must resync with plugin + sync = self.process_network_ports(port_info) + LOG.debug("Agent daemon_loop - " + "iteration:%(iter_num)d - " + "ports processed. Elapsed:%(elapsed).3f", {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - # Secure and wire/unwire VIFs and update their status - # on Neutron server - if (self._port_info_has_changes(port_info) or - self.sg_agent.firewall_refresh_needed()): - LOG.debug(_("Starting to process devices in:%s"), - port_info) - # If treat devices fails - must resync with plugin - sync = self.process_network_ports(port_info) - LOG.debug(_("Agent ovsdb_monitor_loop - " - "iteration:%(iter_num)d - " - "ports processed. Elapsed:%(elapsed).3f"), - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - port_stats['regular']['added'] = ( - len(port_info.get('added', []))) - port_stats['regular']['updated'] = ( - len(port_info.get('updated', []))) - port_stats['regular']['removed'] = ( - len(port_info.get('removed', []))) - - polling_manager.polling_completed() - except Exception: - LOG.exception(_("Error while processing VIF ports")) - # Put the ports back in self.updated_port - self.updated_ports |= updated_ports_copy - sync = True + port_stats['regular']['added'] = ( + len(port_info.get('added', []))) + port_stats['regular']['updated'] = ( + len(port_info.get('updated', []))) + port_stats['regular']['removed'] = ( + len(port_info.get('removed', []))) + except Exception: + LOG.exception(_("Error while processing VIF ports")) + # Put the ports back in self.updated_port + self.updated_ports |= updated_ports_copy + sync = True # sleep till end of polling interval elapsed = (time.time() - start) - LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d " - "completed. Processed ports statistics:" - "%(port_stats)s. Elapsed:%(elapsed).3f"), + LOG.debug("Agent daemon_loop - iteration:%(iter_num)d " + "completed. Processed ports statistics:" + "%(port_stats)s. Elapsed:%(elapsed).3f", {'iter_num': self.iter_num, 'port_stats': port_stats, 'elapsed': elapsed}) if (elapsed < self.polling_interval): time.sleep(self.polling_interval - elapsed) else: - LOG.debug(_("Loop iteration exceeded interval " - "(%(polling_interval)s vs. %(elapsed)s)!"), + LOG.debug("Loop iteration exceeded interval " + "(%(polling_interval)s vs. %(elapsed)s)!", {'polling_interval': self.polling_interval, 'elapsed': elapsed}) self.iter_num = self.iter_num + 1 - def daemon_loop(self): - # TODO(yamamoto): make polling logic stop using ovsdb monitor - # - make it a dumb periodic polling - # - or, monitor port status async messages - with polling.get_polling_manager( - self.minimize_polling, - self.root_helper, - self.ovsdb_monitor_respawn_interval) as pm: - - self.ovsdb_monitor_loop(polling_manager=pm) - def create_agent_config_map(config): """Create a map of agent config parameters. @@ -1392,10 +1358,8 @@ def create_agent_config_map(config): bridge_mappings=bridge_mappings, root_helper=config.AGENT.root_helper, polling_interval=config.AGENT.polling_interval, - minimize_polling=config.AGENT.minimize_polling, tunnel_types=config.AGENT.tunnel_types, veth_mtu=config.AGENT.veth_mtu, - ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN, ) # If enable_tunneling is TRUE, set tunnel_type to default to GRE diff --git a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py index 1180b1de6..d79a26612 100644 --- a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py +++ b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py @@ -808,22 +808,6 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase): self.agent.reclaim_local_vlan(self.lvms[1].net) del_port_fn.assert_called_once_with(self.tun_name2) - def test_daemon_loop_uses_polling_manager(self): - with mock.patch( - 'neutron.agent.linux.polling.get_polling_manager' - ) as mock_get_pm: - fake_pm = mock.Mock() - mock_get_pm.return_value = fake_pm - fake_pm.__enter__ = mock.Mock() - fake_pm.__exit__ = mock.Mock() - with mock.patch.object( - self.agent, 'ovsdb_monitor_loop' - ) as mock_loop: - self.agent.daemon_loop() - mock_get_pm.assert_called_once_with(True, 'fake_helper', - constants.DEFAULT_OVSDBMON_RESPAWN) - mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__()) - def test__setup_tunnel_port_error_negative(self): with contextlib.nested( mock.patch.object(self.agent.tun_br, 'add_tunnel_port', -- 2.45.2