- get_agent_gateway_port
Needed by the agent when operating in DVR/DVR_SNAT mode
1.3 - Get the list of activated services
+ 1.4 - Added L3 HA update_router_state. This method was reworked in
+ to update_ha_routers_states
+ 1.5 - Added update_ha_routers_states
"""
cctxt = self.client.prepare(version='1.3')
return cctxt.call(context, 'get_service_plugin_list')
+ def update_ha_routers_states(self, context, states):
+ """Update HA routers states."""
+ cctxt = self.client.prepare(version='1.5')
+ return cctxt.call(context, 'update_ha_routers_states',
+ host=self.host, states=states)
+
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
ha.AgentMixin,
from neutron.agent.linux import utils as agent_utils
from neutron.common import constants as l3_constants
from neutron.i18n import _LE, _LI
+from neutron.notifiers import batch_notifier
LOG = logging.getLogger(__name__)
def __init__(self, host):
self._init_ha_conf_path()
super(AgentMixin, self).__init__(host)
+ self.state_change_notifier = batch_notifier.BatchNotifier(
+ self._calculate_batch_duration(), self.notify_server)
eventlet.spawn(self._start_keepalived_notifications_server)
def _start_keepalived_notifications_server(self):
L3AgentKeepalivedStateChangeServer(self, self.conf))
state_change_server.run()
+ def _calculate_batch_duration(self):
+ # Slave becomes the master after not hearing from it 3 times
+ detection_time = self.conf.ha_vrrp_advert_int * 3
+
+ # Keepalived takes a couple of seconds to configure the VIPs
+ configuration_time = 2
+
+ # Give it enough slack to batch all events due to the same failure
+ return (detection_time + configuration_time) * 2
+
def enqueue_state_change(self, router_id, state):
LOG.info(_LI('Router %(router_id)s transitioned to %(state)s'),
{'router_id': router_id,
'state': state})
self._update_metadata_proxy(router_id, state)
+ self.state_change_notifier.queue_event((router_id, state))
def _update_metadata_proxy(self, router_id, state):
try:
self.metadata_driver.destroy_monitored_metadata_proxy(
self.process_monitor, ri.router_id, ri.ns_name, self.conf)
+ def notify_server(self, batched_events):
+ translation_map = {'master': 'active',
+ 'backup': 'standby',
+ 'fault': 'standby'}
+ translated_states = dict((router_id, translation_map[state]) for
+ router_id, state in batched_events)
+ LOG.debug('Updating server with HA routers states %s',
+ translated_states)
+ self.plugin_rpc.update_ha_routers_states(
+ self.context, translated_states)
+
def _init_ha_conf_path(self):
ha_full_path = os.path.dirname("/%s/" % self.conf.ha_confs_path)
agent_utils.ensure_dir(ha_full_path)
class L3RpcCallback(object):
"""L3 agent RPC callback in plugin implementations."""
- # 1.0 L3PluginApi BASE_RPC_API_VERSION
- # 1.1 Support update_floatingip_statuses
+ # 1.0 L3PluginApi BASE_RPC_API_VERSION
+ # 1.1 Support update_floatingip_statuses
# 1.2 Added methods for DVR support
# 1.3 Added a method that returns the list of activated services
# 1.4 Added L3 HA update_router_state. This method was later removed,
- # since it was unused. The RPC version was not changed.
- target = oslo_messaging.Target(version='1.4')
+ # since it was unused. The RPC version was not changed
+ # 1.5 Added update_ha_routers_states
+ target = oslo_messaging.Target(version='1.5')
@property
def plugin(self):
'host %(host)s', {'agent_port': agent_port,
'host': host})
return agent_port
+
+ def update_ha_routers_states(self, context, **kwargs):
+ """Update states for HA routers.
+
+ Get a map of router_id to its HA state on a host and update the DB.
+ State must be in: ('active', 'standby').
+ """
+ states = kwargs.get('states')
+ host = kwargs.get('host')
+
+ LOG.debug('Updating HA routers states on host %s: %s', host, states)
+ self.l3plugin.update_routers_states(context, states, host)
router_ids,
active)
return self._process_sync_ha_data(context, sync_data, host)
+
+ @classmethod
+ def _set_router_states(cls, context, bindings, states):
+ for binding in bindings:
+ try:
+ with context.session.begin(subtransactions=True):
+ binding.state = states[binding.router_id]
+ except (orm.exc.StaleDataError, orm.exc.ObjectDeletedError):
+ # Take concurrently deleted routers in to account
+ pass
+
+ def update_routers_states(self, context, states, host):
+ """Receive dict of router ID to state and update them all."""
+
+ bindings = self.get_ha_router_port_bindings(
+ context, router_ids=states.keys(), host=host)
+ self._set_router_states(context, bindings, states)
external_port['mac_address'],
namespace=router.ns_name) for fip in floating_ips)
+ def fail_ha_router(self, router):
+ device_name = router.get_ha_device_name(
+ router.router[l3_constants.HA_INTERFACE_KEY]['id'])
+ ha_device = ip_lib.IPDevice(device_name, router.ns_name)
+ ha_device.link.set_down()
+
class L3AgentTestCase(L3AgentTestFramework):
def test_observer_notifications_legacy_router(self):
router = self.manage_router(self.agent, router_info)
utils.wait_until_true(lambda: router.ha_state == 'master')
- device_name = router.get_ha_device_name(
- router.router[l3_constants.HA_INTERFACE_KEY]['id'])
- ha_device = ip_lib.IPDevice(device_name, router.ns_name)
- ha_device.link.set_down()
+ self.fail_ha_router(router)
utils.wait_until_true(lambda: router.ha_state == 'backup')
utils.wait_until_true(lambda: enqueue_mock.call_count == 3)
self.assertEqual((router.router_id, 'master'), calls[1])
self.assertEqual((router.router_id, 'backup'), calls[2])
+ def _expected_rpc_report(self, expected):
+ calls = (args[0][1] for args in
+ self.agent.plugin_rpc.update_ha_routers_states.call_args_list)
+
+ # Get the last state reported for each router
+ actual_router_states = {}
+ for call in calls:
+ for router_id, state in call.iteritems():
+ actual_router_states[router_id] = state
+
+ return actual_router_states == expected
+
+ def test_keepalived_state_change_bulk_rpc(self):
+ router_info = self.generate_router_info(enable_ha=True)
+ router1 = self.manage_router(self.agent, router_info)
+ self.fail_ha_router(router1)
+ router_info = self.generate_router_info(enable_ha=True)
+ router2 = self.manage_router(self.agent, router_info)
+
+ utils.wait_until_true(lambda: router1.ha_state == 'backup')
+ utils.wait_until_true(lambda: router2.ha_state == 'master')
+ utils.wait_until_true(
+ lambda: self._expected_rpc_report(
+ {router1.router_id: 'standby', router2.router_id: 'active'}))
+
def _test_observer_notifications(self, enable_ha):
"""Test create, update, delete of router and notifications."""
with mock.patch.object(
from neutron.agent.common import config as agent_config
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3 import config as l3_config
+from neutron.agent.l3 import ha as l3_ha_agent
from neutron.agent.metadata import driver as metadata_driver
from neutron.openstack.common import uuidutils
from neutron.tests import base
'._init_ha_conf_path').start()
cfg.CONF.register_opts(l3_config.OPTS)
+ cfg.CONF.register_opts(l3_ha_agent.OPTS)
cfg.CONF.register_opts(metadata_driver.MetadataDriver.OPTS)
def _test_spawn_metadata_proxy(self, expected_user, expected_group,
routers_after = self.plugin.get_routers(self.admin_ctx)
self.assertEqual(routers_before, routers_after)
+ def test_update_routers_states(self):
+ router1 = self._create_router()
+ self._bind_router(router1['id'])
+ router2 = self._create_router()
+ self._bind_router(router2['id'])
+
+ routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
+ self.agent1['host'])
+ for router in routers:
+ self.assertEqual('standby', router[constants.HA_ROUTER_STATE_KEY])
+
+ states = {router1['id']: 'active',
+ router2['id']: 'standby'}
+ self.plugin.update_routers_states(
+ self.admin_ctx, states, self.agent1['host'])
+
+ routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
+ self.agent1['host'])
+ for router in routers:
+ self.assertEqual(states[router['id']],
+ router[constants.HA_ROUTER_STATE_KEY])
+
+ def test_set_router_states_handles_concurrently_deleted_router(self):
+ router1 = self._create_router()
+ self._bind_router(router1['id'])
+ router2 = self._create_router()
+ self._bind_router(router2['id'])
+ bindings = self.plugin.get_ha_router_port_bindings(
+ self.admin_ctx, [router1['id'], router2['id']])
+ self.plugin.delete_router(self.admin_ctx, router1['id'])
+ self.plugin._set_router_states(
+ self.admin_ctx, bindings, {router1['id']: 'active',
+ router2['id']: 'active'})
+ routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
+ self.agent1['host'])
+ self.assertEqual('active', routers[0][constants.HA_ROUTER_STATE_KEY])
+
def test_exclude_dvr_agents_for_ha_candidates(self):
"""Test dvr agents are not counted in the ha candidates.