#
"""
-import sys
-
import eventlet
from eventlet import semaphore
import netaddr
proxy_cmd = ['quantum-ns-metadata-proxy',
'--pid_file=%s' % pid_file,
'--router_id=%s' % router_info.router_id,
- '--state_path=%s' % self.conf.state_path]
+ '--state_path=%s' % self.conf.state_path,
+ '--metadata_port=%s' % self.conf.metadata_port]
proxy_cmd.extend(config.get_log_args(
cfg.CONF, 'quantum-ns-metadata-proxy-%s.log' %
router_info.router_id))
agent._destroy_router_namespaces(self.conf.router_id)
self.assertEqual(agent._destroy_router_namespace.call_count, 1)
+
+
+class TestL3AgentEventHandler(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestL3AgentEventHandler, self).setUp()
+ cfg.CONF.register_opts(l3_agent.L3NATAgent.OPTS)
+ cfg.CONF.set_override(
+ 'interface_driver', 'quantum.agent.linux.interface.NullDriver'
+ )
+ cfg.CONF.set_override('use_namespaces', True)
+ agent_config.register_root_helper(cfg.CONF)
+
+ self.device_exists_p = mock.patch(
+ 'quantum.agent.linux.ip_lib.device_exists')
+ self.device_exists = self.device_exists_p.start()
+
+ self.utils_exec_p = mock.patch(
+ 'quantum.agent.linux.utils.execute')
+ self.utils_exec = self.utils_exec_p.start()
+
+ self.drv_cls_p = mock.patch('quantum.agent.linux.interface.NullDriver')
+ driver_cls = self.drv_cls_p.start()
+ self.mock_driver = mock.MagicMock()
+ self.mock_driver.DEV_NAME_LEN = (
+ interface.LinuxInterfaceDriver.DEV_NAME_LEN)
+ driver_cls.return_value = self.mock_driver
+
+ self.l3_plugin_p = mock.patch(
+ 'quantum.agent.l3_agent.L3PluginApi')
+ l3_plugin_cls = self.l3_plugin_p.start()
+ self.plugin_api = mock.Mock()
+ l3_plugin_cls.return_value = self.plugin_api
+
+ self.external_process_p = mock.patch(
+ 'quantum.agent.linux.external_process.ProcessManager'
+ )
+ self.external_process = self.external_process_p.start()
+
+ self.agent = l3_agent.L3NATAgent(HOSTNAME)
+
+ def tearDown(self):
+ self.device_exists_p.stop()
+ self.utils_exec_p.stop()
+ self.drv_cls_p.stop()
+ self.l3_plugin_p.stop()
+ self.external_process_p.stop()
+ super(TestL3AgentEventHandler, self).tearDown()
+
+ def test_spawn_metadata_proxy(self):
+ router_id = _uuid()
+ metadata_port = 8080
+ ip_class_path = 'quantum.agent.linux.ip_lib.IPWrapper'
+
+ cfg.CONF.set_override('metadata_port', metadata_port)
+ cfg.CONF.set_override('log_file', 'test.log')
+ cfg.CONF.set_override('debug', True)
+
+ router_info = l3_agent.RouterInfo(
+ router_id, cfg.CONF.root_helper, cfg.CONF.use_namespaces, None
+ )
+
+ self.external_process_p.stop()
+ try:
+ with mock.patch(ip_class_path) as ip_mock:
+ self.agent._spawn_metadata_proxy(router_info)
+ ip_mock.assert_has_calls([
+ mock.call(
+ 'sudo',
+ 'qrouter-' + router_id
+ ),
+ mock.call().netns.execute([
+ 'quantum-ns-metadata-proxy',
+ mock.ANY,
+ '--router_id=%s' % router_id,
+ mock.ANY,
+ '--metadata_port=%s' % metadata_port,
+ '--debug',
+ '--log-file=quantum-ns-metadata-proxy-%s.log' %
+ router_id
+ ])
+ ])
+ finally:
+ self.external_process_p.start()