from neutron.agent import l2population_rpc
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
-from neutron.agent.linux import polling
from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
def __init__(self, ryuapp, integ_br, tun_br, local_ip,
bridge_mappings, root_helper,
polling_interval, tunnel_types=None,
- veth_mtu=None, minimize_polling=False,
- ovsdb_monitor_respawn_interval=(
- constants.DEFAULT_OVSDBMON_RESPAWN)):
+ veth_mtu=None):
"""Constructor.
:param ryuapp: object of the ryu app.
the agent. If set, will automatically set enable_tunneling to
True.
:param veth_mtu: MTU size for veth interfaces.
- :param minimize_polling: Optional, whether to minimize polling by
- monitoring ovsdb for interface changes.
- :param ovsdb_monitor_respawn_interval: Optional, when using polling
- minimization, the number of seconds to wait before respawning
- the ovsdb monitor.
"""
super(OFANeutronAgent, self).__init__()
self.ryuapp = ryuapp
p_const.TYPE_VXLAN: {}}
self.polling_interval = polling_interval
- self.minimize_polling = minimize_polling
- self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
self.enable_tunneling = bool(self.tunnel_types)
self.local_ip = local_ip
resync = True
return resync
- def _agent_has_updates(self, polling_manager):
- return (polling_manager.is_polling_required or
- self.updated_ports or
- self.sg_agent.firewall_refresh_needed())
-
def _port_info_has_changes(self, port_info):
return (port_info.get('added') or
port_info.get('removed') or
port_info.get('updated'))
- def ovsdb_monitor_loop(self, polling_manager=None):
- if not polling_manager:
- polling_manager = polling.AlwaysPoll()
+ def daemon_loop(self):
+ # TODO(yamamoto):
+ # It might be better to monitor port status async messages
sync = True
ports = set()
- updated_ports_copy = set()
tunnel_sync = True
while True:
start = time.time()
port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}}
- LOG.debug(_("Agent ovsdb_monitor_loop - "
- "iteration:%d started"),
+ LOG.debug("Agent daemon_loop - iteration:%d started",
self.iter_num)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
sync = False
- polling_manager.force_polling()
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_("Agent tunnel out of sync with plugin!"))
except Exception:
LOG.exception(_("Error while synchronizing tunnels"))
tunnel_sync = True
- if self._agent_has_updates(polling_manager):
- try:
- LOG.debug(_("Agent ovsdb_monitor_loop - "
- "iteration:%(iter_num)d - "
- "starting polling. Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- # Save updated ports dict to perform rollback in
- # case resync would be needed, and then clear
- # self.updated_ports. As the greenthread should not yield
- # between these two statements, this will be thread-safe
- updated_ports_copy = self.updated_ports
- self.updated_ports = set()
- port_info = self.scan_ports(ports, updated_ports_copy)
- ports = port_info['current']
- LOG.debug(_("Agent ovsdb_monitor_loop - "
- "iteration:%(iter_num)d - "
- "port information retrieved. "
- "Elapsed:%(elapsed).3f"),
+ LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
+ "starting polling. Elapsed:%(elapsed).3f",
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ try:
+ # Save updated ports dict to perform rollback in
+ # case resync would be needed, and then clear
+ # self.updated_ports. As the greenthread should not yield
+ # between these two statements, this will be thread-safe
+ updated_ports_copy = self.updated_ports
+ self.updated_ports = set()
+ port_info = self.scan_ports(ports, updated_ports_copy)
+ ports = port_info['current']
+ LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
+ "port information retrieved. "
+ "Elapsed:%(elapsed).3f",
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ # Secure and wire/unwire VIFs and update their status
+ # on Neutron server
+ if (self._port_info_has_changes(port_info) or
+ self.sg_agent.firewall_refresh_needed()):
+ LOG.debug("Starting to process devices in:%s",
+ port_info)
+ # If treat devices fails - must resync with plugin
+ sync = self.process_network_ports(port_info)
+ LOG.debug("Agent daemon_loop - "
+ "iteration:%(iter_num)d - "
+ "ports processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
- # Secure and wire/unwire VIFs and update their status
- # on Neutron server
- if (self._port_info_has_changes(port_info) or
- self.sg_agent.firewall_refresh_needed()):
- LOG.debug(_("Starting to process devices in:%s"),
- port_info)
- # If treat devices fails - must resync with plugin
- sync = self.process_network_ports(port_info)
- LOG.debug(_("Agent ovsdb_monitor_loop - "
- "iteration:%(iter_num)d - "
- "ports processed. Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- port_stats['regular']['added'] = (
- len(port_info.get('added', [])))
- port_stats['regular']['updated'] = (
- len(port_info.get('updated', [])))
- port_stats['regular']['removed'] = (
- len(port_info.get('removed', [])))
-
- polling_manager.polling_completed()
- except Exception:
- LOG.exception(_("Error while processing VIF ports"))
- # Put the ports back in self.updated_port
- self.updated_ports |= updated_ports_copy
- sync = True
+ port_stats['regular']['added'] = (
+ len(port_info.get('added', [])))
+ port_stats['regular']['updated'] = (
+ len(port_info.get('updated', [])))
+ port_stats['regular']['removed'] = (
+ len(port_info.get('removed', [])))
+ except Exception:
+ LOG.exception(_("Error while processing VIF ports"))
+ # Put the ports back in self.updated_port
+ self.updated_ports |= updated_ports_copy
+ sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
- LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "
- "completed. Processed ports statistics:"
- "%(port_stats)s. Elapsed:%(elapsed).3f"),
+ LOG.debug("Agent daemon_loop - iteration:%(iter_num)d "
+ "completed. Processed ports statistics:"
+ "%(port_stats)s. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'port_stats': port_stats,
'elapsed': elapsed})
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
- LOG.debug(_("Loop iteration exceeded interval "
- "(%(polling_interval)s vs. %(elapsed)s)!"),
+ LOG.debug("Loop iteration exceeded interval "
+ "(%(polling_interval)s vs. %(elapsed)s)!",
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
self.iter_num = self.iter_num + 1
- def daemon_loop(self):
- # TODO(yamamoto): make polling logic stop using ovsdb monitor
- # - make it a dumb periodic polling
- # - or, monitor port status async messages
- with polling.get_polling_manager(
- self.minimize_polling,
- self.root_helper,
- self.ovsdb_monitor_respawn_interval) as pm:
-
- self.ovsdb_monitor_loop(polling_manager=pm)
-
def create_agent_config_map(config):
"""Create a map of agent config parameters.
bridge_mappings=bridge_mappings,
root_helper=config.AGENT.root_helper,
polling_interval=config.AGENT.polling_interval,
- minimize_polling=config.AGENT.minimize_polling,
tunnel_types=config.AGENT.tunnel_types,
veth_mtu=config.AGENT.veth_mtu,
- ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
)
# If enable_tunneling is TRUE, set tunnel_type to default to GRE