from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron.openstack.common import loopingcall
+from neutron.openstack.common import service
from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.common import config # noqa
from neutron.plugins.linuxbridge.common import constants as lconst
getattr(self, method)(context, values)
-class LinuxBridgeNeutronAgentRPC(object):
+class LinuxBridgeNeutronAgentRPC(service.Service):
- def __init__(self, interface_mappings, polling_interval):
+ def __init__(self, interface_mappings, polling_interval,
+ quitting_rpc_timeout):
+ """Constructor.
+
+ :param interface_mappings: dict mapping physical_networks to
+ physical_interfaces.
+ :param polling_interval: interval (secs) to poll DB.
+ :param quitting_rpc_timeout: timeout in seconds for rpc calls after
+ stop is called.
+ """
+ super(LinuxBridgeNeutronAgentRPC, self).__init__()
+ self.interface_mappings = interface_mappings
self.polling_interval = polling_interval
- self.setup_linux_bridge(interface_mappings)
- configurations = {'interface_mappings': interface_mappings}
+ self.quitting_rpc_timeout = quitting_rpc_timeout
+
+ def start(self):
+ self.setup_linux_bridge(self.interface_mappings)
+ configurations = {'interface_mappings': self.interface_mappings}
if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.br_mgr.local_ip
configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc)
- self.setup_rpc(interface_mappings.values())
+ self.setup_rpc(self.interface_mappings.values())
+ self.daemon_loop()
+
+ def stop(self, graceful=True):
+ LOG.info(_LI("Stopping linuxbridge agent."))
+ if graceful and self.quitting_rpc_timeout:
+ self.set_rpc_timeout(self.quitting_rpc_timeout)
+ super(LinuxBridgeNeutronAgentRPC, self).stop(graceful)
+
+ def reset(self):
+ common_config.setup_logging()
def _report_state(self):
try:
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
+ def set_rpc_timeout(self, timeout):
+ for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc,
+ self.state_rpc):
+ rpc_api.client.timeout = timeout
+
def main():
common_config.init(sys.argv[1:])
LOG.info(_LI("Interface mappings: %s"), interface_mappings)
polling_interval = cfg.CONF.AGENT.polling_interval
+ quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout
agent = LinuxBridgeNeutronAgentRPC(interface_mappings,
- polling_interval)
+ polling_interval,
+ quitting_rpc_timeout)
LOG.info(_LI("Agent initialized successfully, now running... "))
- agent.daemon_loop()
- sys.exit(0)
+ launcher = service.launch(agent)
+ launcher.wait()
if __name__ == "__main__":
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
+ cfg.CONF.set_default('quitting_rpc_timeout', 10, 'AGENT')
self.get_devices_p = mock.patch.object(ip_lib.IPWrapper, 'get_devices')
self.get_devices = self.get_devices_p.start()
self.get_devices.return_value = [ip_lib.IPDevice('eth77')]
with mock.patch.object(linuxbridge_neutron_agent.LinuxBridgeManager,
'get_interface_by_ip', return_value=None):
self.agent = linuxbridge_neutron_agent.LinuxBridgeNeutronAgentRPC(
- {}, 0)
+ {}, 0, cfg.CONF.AGENT.quitting_rpc_timeout)
+ with mock.patch.object(self.agent, "daemon_loop"):
+ self.agent.start()
def test_treat_devices_removed_with_existed_device(self):
agent = self.agent
agent.remove_port_binding.assert_called_with('net123', 'port123')
self.assertFalse(agent.plugin_rpc.update_device_up.called)
+ def test_set_rpc_timeout(self):
+ self.agent.stop()
+ for rpc_client in (self.agent.plugin_rpc.client,
+ self.agent.sg_plugin_rpc.client,
+ self.agent.state_rpc.client):
+ self.assertEqual(cfg.CONF.AGENT.quitting_rpc_timeout,
+ rpc_client.timeout)
+
+ def test_set_rpc_timeout_no_value(self):
+ self.agent.quitting_rpc_timeout = None
+ with mock.patch.object(self.agent, 'set_rpc_timeout') as mock_set_rpc:
+ self.agent.stop()
+ self.assertFalse(mock_set_rpc.called)
+
class TestLinuxBridgeManager(base.BaseTestCase):
def setUp(self):