import uuid
import mock
+import testscenarios
+
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo.utils import importutils
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
-HOST = 'my_l3_host'
-FIRST_L3_AGENT = {
- 'binary': 'neutron-l3-agent',
- 'host': HOST,
- 'topic': topics.L3_AGENT,
- 'configurations': {},
- 'agent_type': constants.AGENT_TYPE_L3,
- 'start_flag': True
-}
-
-HOST_2 = 'my_l3_host_2'
-SECOND_L3_AGENT = {
- 'binary': 'neutron-l3-agent',
- 'host': HOST_2,
- 'topic': topics.L3_AGENT,
- 'configurations': {},
- 'agent_type': constants.AGENT_TYPE_L3,
- 'start_flag': True
-}
-
-HOST_3 = 'my_l3_host_3'
-THIRD_L3_AGENT = {
- 'binary': 'neutron-l3-agent',
- 'host': HOST_3,
- 'topic': topics.L3_AGENT,
- 'configurations': {},
- 'agent_type': constants.AGENT_TYPE_L3,
- 'start_flag': True
-}
-
-HOST_4 = 'my_l3_host_4'
-FOURTH_L3_AGENT = {
- 'binary': 'neutron-l3-agent',
- 'host': HOST_4,
- 'topic': topics.L3_AGENT,
- 'configurations': {},
- 'agent_type': constants.AGENT_TYPE_L3,
- 'start_flag': True
-}
+# the below code is required for the following reason
+# (as documented in testscenarios)
+"""Multiply tests depending on their 'scenarios' attribute.
+ This can be assigned to 'load_tests' in any test module to make this
+ automatically work across tests in the module.
+"""
+load_tests = testscenarios.load_tests_apply_scenarios
HOST_DVR = 'my_l3_host_dvr'
DVR_L3_AGENT = {
class L3SchedulerBaseMixin(object):
- def _register_l3_agent(self, agent, plugin=None):
+ def _register_l3_agent(self, host, agent_mode='legacy', plugin=None):
if not plugin:
plugin = self.plugin
+ agent = {
+ 'binary': 'neutron-l3-agent',
+ 'host': host,
+ 'topic': topics.L3_AGENT,
+ 'configurations': {'agent_mode': agent_mode},
+ 'agent_type': constants.AGENT_TYPE_L3,
+ 'start_flag': True
+ }
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
agent_state={'agent_state': agent},
return agent_db[0]
def _register_l3_agents(self, plugin=None):
- self.agent1 = self._register_l3_agent(FIRST_L3_AGENT, plugin)
+ self.agent1 = self._register_l3_agent('host_1', plugin=plugin)
self.agent_id1 = self.agent1.id
- self.agent2 = self._register_l3_agent(SECOND_L3_AGENT, plugin)
+ self.agent2 = self._register_l3_agent('host_2', plugin=plugin)
self.agent_id2 = self.agent2.id
def _register_l3_dvr_agents(self):
# test legacy agent_mode case: only legacy agent should be candidate
router['distributed'] = False
- exp_host = FIRST_L3_AGENT.get('host')
+ exp_host = 'host_1'
self._check_get_l3_agent_candidates(router, agent_list, exp_host)
def test_get_l3_agent_candidates_dvr(self):
def _prepare_check_ports_exist_tests(self):
l3_agent = agents_db.Agent()
l3_agent.admin_state_up = True
- l3_agent.host = HOST
+ l3_agent.host = 'host_1'
router = self._make_router(self.fmt,
tenant_id=str(uuid.uuid4()),
name='r2')
getp.return_value = self.plugin
# matching subnet
port = {'subnet_id': str(uuid.uuid4()),
- 'binding:host_id': HOST,
+ 'binding:host_id': 'host_1',
'device_owner': 'compute:',
'id': 1234}
self.plugin.get_ports.return_value = [port]
self._set_l3_agent_admin_state(self.adminContext,
self.agent_id1, True)
self.plugin.auto_schedule_routers(self.adminContext,
- FIRST_L3_AGENT['host'],
+ 'host_1',
[r1['router']['id']])
agents = self.get_l3_agents_hosting_routers(
self.adminContext, [r1['router']['id']],
admin_state_up=True)
- self.assertEqual(FIRST_L3_AGENT['host'], agents[0]['host'])
+ self.assertEqual('host_1', agents[0]['host'])
class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCase):
super(L3_HA_scheduler_db_mixinTestCase,
self)._register_l3_agents(plugin=plugin)
- self.agent3 = self._register_l3_agent(THIRD_L3_AGENT, plugin)
+ self.agent3 = self._register_l3_agent('host_3', plugin=plugin)
self.agent_id3 = self.agent3.id
- self.agent4 = self._register_l3_agent(FOURTH_L3_AGENT, plugin)
+ self.agent4 = self._register_l3_agent('host_4', plugin=plugin)
self.agent_id4 = self.agent4.id
def test_get_ha_routers_l3_agents_count(self):
self.assertIn(self.agent_id1, agent_ids)
self.assertIn(self.agent_id2, agent_ids)
- agent = self._register_l3_agent(THIRD_L3_AGENT)
+ agent = self._register_l3_agent('host_3')
self.agent_id3 = agent.id
routers_to_auto_schedule = [router['id']] if specific_router else []
self.plugin.auto_schedule_routers(self.adminContext,
- THIRD_L3_AGENT['host'],
+ 'host_3',
routers_to_auto_schedule)
agents = self.plugin.get_l3_agents_hosting_routers(
# Simulate agent restart to make sure we don't try to re-bind
self.plugin.auto_schedule_routers(self.adminContext,
- THIRD_L3_AGENT['host'],
+ 'host_3',
routers_to_auto_schedule)
def test_scheduler_with_ha_enabled_not_enough_agent(self):
super(L3HALeastRoutersSchedulerTestCase,
self)._register_l3_agents(plugin=plugin)
- agent = self._register_l3_agent(THIRD_L3_AGENT, plugin)
+ agent = self._register_l3_agent('host_3', plugin=plugin)
self.agent_id3 = agent.id
- agent = self._register_l3_agent(FOURTH_L3_AGENT, plugin)
+ agent = self._register_l3_agent('host_4', plugin=plugin)
self.agent_id4 = agent.id
def setUp(self):
agent_ids = [agent['id'] for agent in agents]
self.assertIn(self.agent_id3, agent_ids)
self.assertIn(self.agent_id4, agent_ids)
+
+
+class TestGetL3AgentsWithAgentModeFilter(testlib_api.SqlTestCase,
+ testlib_plugin.PluginSetupHelper,
+ L3SchedulerBaseMixin):
+ """Test cases to test get_l3_agents.
+
+ This class tests the L3AgentSchedulerDbMixin.get_l3_agents()
+ for the 'agent_mode' filter with various values.
+
+ 5 l3 agents are registered in the order - legacy, dvr_snat, dvr, fake_mode
+ and legacy
+ """
+
+ scenarios = [
+ ('no filter',
+ dict(agent_modes=[],
+ expected_agent_modes=['legacy', 'dvr_snat', 'dvr',
+ 'fake_mode', 'legacy'])),
+
+ ('legacy',
+ dict(agent_modes=['legacy'],
+ expected_agent_modes=['legacy', 'legacy'])),
+
+ ('dvr_snat',
+ dict(agent_modes=['dvr_snat'],
+ expected_agent_modes=['dvr_snat'])),
+
+ ('dvr ',
+ dict(agent_modes=['dvr'],
+ expected_agent_modes=['dvr'])),
+
+ ('legacy and dvr snat',
+ dict(agent_modes=['legacy', 'dvr_snat', 'legacy'],
+ expected_agent_modes=['legacy', 'dvr_snat', 'legacy'])),
+
+ ('legacy and dvr',
+ dict(agent_modes=['legacy', 'dvr'],
+ expected_agent_modes=['legacy', 'dvr', 'legacy'])),
+
+ ('dvr_snat and dvr',
+ dict(agent_modes=['dvr_snat', 'dvr'],
+ expected_agent_modes=['dvr_snat', 'dvr'])),
+
+ ('legacy, dvr_snat and dvr',
+ dict(agent_modes=['legacy', 'dvr_snat', 'dvr'],
+ expected_agent_modes=['legacy', 'dvr_snat', 'dvr',
+ 'legacy'])),
+
+ ('invalid',
+ dict(agent_modes=['invalid'],
+ expected_agent_modes=[])),
+ ]
+
+ def setUp(self):
+ super(TestGetL3AgentsWithAgentModeFilter, self).setUp()
+ self.plugin = L3HAPlugin()
+ self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')
+ self.adminContext = q_context.get_admin_context()
+ hosts = ['host_1', 'host_2', 'host_3', 'host_4', 'host_5']
+ agent_modes = ['legacy', 'dvr_snat', 'dvr', 'fake_mode', 'legacy']
+ for host, agent_mode in zip(hosts, agent_modes):
+ self._register_l3_agent(host, agent_mode, self.plugin)
+
+ def _get_agent_mode(self, agent):
+ agent_conf = self.plugin.get_configuration_dict(agent)
+ return agent_conf.get('agent_mode', 'None')
+
+ def test_get_l3_agents(self):
+ l3_agents = self.plugin.get_l3_agents(
+ self.adminContext, filters={'agent_modes': self.agent_modes})
+ self.assertEqual(len(self.expected_agent_modes), len(l3_agents))
+ returned_agent_modes = [self._get_agent_mode(agent)
+ for agent in l3_agents]
+ self.assertEqual(self.expected_agent_modes, returned_agent_modes)