]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Send notification to controller about HA router state change
authorAssaf Muller <amuller@redhat.com>
Sun, 28 Sep 2014 11:26:42 +0000 (14:26 +0300)
committerAssaf Muller <amuller@redhat.com>
Fri, 20 Mar 2015 22:03:59 +0000 (18:03 -0400)
The L3 agent gets keepalived state change notifications via
a unix domain socket. These events are now batched and
send out as a single RPC to the server. In case the same
router got updated multiple times during the batch period,
only the latest state is sent.

Partially-Implements: blueprint report-ha-router-master
Change-Id: I36834ad3d9e8a49a702f01acc29c7c38f2d48833

neutron/agent/l3/agent.py
neutron/agent/l3/ha.py
neutron/api/rpc/handlers/l3_rpc.py
neutron/db/l3_hamode_db.py
neutron/tests/functional/agent/test_l3_agent.py
neutron/tests/unit/agent/metadata/test_driver.py
neutron/tests/unit/db/test_l3_ha_db.py

index 99998d66756cc40584c2713387116c8bca0da388..c39d3a1b6d704b6318a6ee9a16079212c3c0cd9f 100644 (file)
@@ -73,6 +73,9 @@ class L3PluginApi(object):
               - get_agent_gateway_port
               Needed by the agent when operating in DVR/DVR_SNAT mode
         1.3 - Get the list of activated services
+        1.4 - Added L3 HA update_router_state. This method was reworked in
+              to update_ha_routers_states
+        1.5 - Added update_ha_routers_states
 
     """
 
@@ -120,6 +123,12 @@ class L3PluginApi(object):
         cctxt = self.client.prepare(version='1.3')
         return cctxt.call(context, 'get_service_plugin_list')
 
+    def update_ha_routers_states(self, context, states):
+        """Update HA routers states."""
+        cctxt = self.client.prepare(version='1.5')
+        return cctxt.call(context, 'update_ha_routers_states',
+                          host=self.host, states=states)
+
 
 class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
                  ha.AgentMixin,
index 7f75864d767aa21a9326ed288a2b4779b0f1aa04..66a61913f358a522a6ea1a068db20220b32a175d 100644 (file)
@@ -24,6 +24,7 @@ from neutron.agent.linux import keepalived
 from neutron.agent.linux import utils as agent_utils
 from neutron.common import constants as l3_constants
 from neutron.i18n import _LE, _LI
+from neutron.notifiers import batch_notifier
 
 LOG = logging.getLogger(__name__)
 
@@ -91,6 +92,8 @@ class AgentMixin(object):
     def __init__(self, host):
         self._init_ha_conf_path()
         super(AgentMixin, self).__init__(host)
+        self.state_change_notifier = batch_notifier.BatchNotifier(
+            self._calculate_batch_duration(), self.notify_server)
         eventlet.spawn(self._start_keepalived_notifications_server)
 
     def _start_keepalived_notifications_server(self):
@@ -98,11 +101,22 @@ class AgentMixin(object):
             L3AgentKeepalivedStateChangeServer(self, self.conf))
         state_change_server.run()
 
+    def _calculate_batch_duration(self):
+        # Slave becomes the master after not hearing from it 3 times
+        detection_time = self.conf.ha_vrrp_advert_int * 3
+
+        # Keepalived takes a couple of seconds to configure the VIPs
+        configuration_time = 2
+
+        # Give it enough slack to batch all events due to the same failure
+        return (detection_time + configuration_time) * 2
+
     def enqueue_state_change(self, router_id, state):
         LOG.info(_LI('Router %(router_id)s transitioned to %(state)s'),
                  {'router_id': router_id,
                   'state': state})
         self._update_metadata_proxy(router_id, state)
+        self.state_change_notifier.queue_event((router_id, state))
 
     def _update_metadata_proxy(self, router_id, state):
         try:
@@ -122,6 +136,17 @@ class AgentMixin(object):
             self.metadata_driver.destroy_monitored_metadata_proxy(
                 self.process_monitor, ri.router_id, ri.ns_name, self.conf)
 
+    def notify_server(self, batched_events):
+        translation_map = {'master': 'active',
+                           'backup': 'standby',
+                           'fault': 'standby'}
+        translated_states = dict((router_id, translation_map[state]) for
+                                 router_id, state in batched_events)
+        LOG.debug('Updating server with HA routers states %s',
+                  translated_states)
+        self.plugin_rpc.update_ha_routers_states(
+            self.context, translated_states)
+
     def _init_ha_conf_path(self):
         ha_full_path = os.path.dirname("/%s/" % self.conf.ha_confs_path)
         agent_utils.ensure_dir(ha_full_path)
index 5865eb7cf4c9f04edaf7ea64c0914542d00218ec..9e2d47ed0021d7340b639f55774ac87add13f5a8 100644 (file)
@@ -35,13 +35,14 @@ LOG = logging.getLogger(__name__)
 class L3RpcCallback(object):
     """L3 agent RPC callback in plugin implementations."""
 
-    # 1.0  L3PluginApi BASE_RPC_API_VERSION
-    # 1.1  Support update_floatingip_statuses
+    # 1.0 L3PluginApi BASE_RPC_API_VERSION
+    # 1.1 Support update_floatingip_statuses
     # 1.2 Added methods for DVR support
     # 1.3 Added a method that returns the list of activated services
     # 1.4 Added L3 HA update_router_state. This method was later removed,
-    #     since it was unused. The RPC version was not changed.
-    target = oslo_messaging.Target(version='1.4')
+    #     since it was unused. The RPC version was not changed
+    # 1.5 Added update_ha_routers_states
+    target = oslo_messaging.Target(version='1.5')
 
     @property
     def plugin(self):
@@ -209,3 +210,15 @@ class L3RpcCallback(object):
                   'host %(host)s', {'agent_port': agent_port,
                   'host': host})
         return agent_port
+
+    def update_ha_routers_states(self, context, **kwargs):
+        """Update states for HA routers.
+
+        Get a map of router_id to its HA state on a host and update the DB.
+        State must be in: ('active', 'standby').
+        """
+        states = kwargs.get('states')
+        host = kwargs.get('host')
+
+        LOG.debug('Updating HA routers states on host %s: %s', host, states)
+        self.l3plugin.update_routers_states(context, states, host)
index 1c0a6e59576675f162a8a975bbd61e91db0d5d15..a7ddfdadc7daa03815d4db3b3e390522bca02dc0 100644 (file)
@@ -462,3 +462,20 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
                                                                   router_ids,
                                                                   active)
         return self._process_sync_ha_data(context, sync_data, host)
+
+    @classmethod
+    def _set_router_states(cls, context, bindings, states):
+        for binding in bindings:
+            try:
+                with context.session.begin(subtransactions=True):
+                    binding.state = states[binding.router_id]
+            except (orm.exc.StaleDataError, orm.exc.ObjectDeletedError):
+                # Take concurrently deleted routers in to account
+                pass
+
+    def update_routers_states(self, context, states, host):
+        """Receive dict of router ID to state and update them all."""
+
+        bindings = self.get_ha_router_port_bindings(
+            context, router_ids=states.keys(), host=host)
+        self._set_router_states(context, bindings, states)
index a822eb2640e6d7449772595fc8e72ae576c6be86..729b083605b1b5f02723e0eebd56790161e80400 100755 (executable)
@@ -271,6 +271,12 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
             external_port['mac_address'],
             namespace=router.ns_name) for fip in floating_ips)
 
+    def fail_ha_router(self, router):
+        device_name = router.get_ha_device_name(
+            router.router[l3_constants.HA_INTERFACE_KEY]['id'])
+        ha_device = ip_lib.IPDevice(device_name, router.ns_name)
+        ha_device.link.set_down()
+
 
 class L3AgentTestCase(L3AgentTestFramework):
     def test_observer_notifications_legacy_router(self):
@@ -286,10 +292,7 @@ class L3AgentTestCase(L3AgentTestFramework):
         router = self.manage_router(self.agent, router_info)
         utils.wait_until_true(lambda: router.ha_state == 'master')
 
-        device_name = router.get_ha_device_name(
-            router.router[l3_constants.HA_INTERFACE_KEY]['id'])
-        ha_device = ip_lib.IPDevice(device_name, router.ns_name)
-        ha_device.link.set_down()
+        self.fail_ha_router(router)
         utils.wait_until_true(lambda: router.ha_state == 'backup')
 
         utils.wait_until_true(lambda: enqueue_mock.call_count == 3)
@@ -298,6 +301,31 @@ class L3AgentTestCase(L3AgentTestFramework):
         self.assertEqual((router.router_id, 'master'), calls[1])
         self.assertEqual((router.router_id, 'backup'), calls[2])
 
+    def _expected_rpc_report(self, expected):
+        calls = (args[0][1] for args in
+                 self.agent.plugin_rpc.update_ha_routers_states.call_args_list)
+
+        # Get the last state reported for each router
+        actual_router_states = {}
+        for call in calls:
+            for router_id, state in call.iteritems():
+                actual_router_states[router_id] = state
+
+        return actual_router_states == expected
+
+    def test_keepalived_state_change_bulk_rpc(self):
+        router_info = self.generate_router_info(enable_ha=True)
+        router1 = self.manage_router(self.agent, router_info)
+        self.fail_ha_router(router1)
+        router_info = self.generate_router_info(enable_ha=True)
+        router2 = self.manage_router(self.agent, router_info)
+
+        utils.wait_until_true(lambda: router1.ha_state == 'backup')
+        utils.wait_until_true(lambda: router2.ha_state == 'master')
+        utils.wait_until_true(
+            lambda: self._expected_rpc_report(
+                {router1.router_id: 'standby', router2.router_id: 'active'}))
+
     def _test_observer_notifications(self, enable_ha):
         """Test create, update, delete of router and notifications."""
         with mock.patch.object(
index 4f74cde1bcb6f2f751c8ee674be36278f11b9d77..025b78a414a66c0a3eb8b210816587decb2b7f82 100644 (file)
@@ -22,6 +22,7 @@ from oslo_config import cfg
 from neutron.agent.common import config as agent_config
 from neutron.agent.l3 import agent as l3_agent
 from neutron.agent.l3 import config as l3_config
+from neutron.agent.l3 import ha as l3_ha_agent
 from neutron.agent.metadata import driver as metadata_driver
 from neutron.openstack.common import uuidutils
 from neutron.tests import base
@@ -74,6 +75,7 @@ class TestMetadataDriverProcess(base.BaseTestCase):
                    '._init_ha_conf_path').start()
 
         cfg.CONF.register_opts(l3_config.OPTS)
+        cfg.CONF.register_opts(l3_ha_agent.OPTS)
         cfg.CONF.register_opts(metadata_driver.MetadataDriver.OPTS)
 
     def _test_spawn_metadata_proxy(self, expected_user, expected_group,
index b35135169790a4d90525ff0d9a50713678051319..b1bbf3e978957cb6d0ea88c8824beb3f2b5c0869 100644 (file)
@@ -387,6 +387,43 @@ class L3HATestCase(L3HATestFramework):
         routers_after = self.plugin.get_routers(self.admin_ctx)
         self.assertEqual(routers_before, routers_after)
 
+    def test_update_routers_states(self):
+        router1 = self._create_router()
+        self._bind_router(router1['id'])
+        router2 = self._create_router()
+        self._bind_router(router2['id'])
+
+        routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
+                                                        self.agent1['host'])
+        for router in routers:
+            self.assertEqual('standby', router[constants.HA_ROUTER_STATE_KEY])
+
+        states = {router1['id']: 'active',
+                  router2['id']: 'standby'}
+        self.plugin.update_routers_states(
+            self.admin_ctx, states, self.agent1['host'])
+
+        routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
+                                                        self.agent1['host'])
+        for router in routers:
+            self.assertEqual(states[router['id']],
+                             router[constants.HA_ROUTER_STATE_KEY])
+
+    def test_set_router_states_handles_concurrently_deleted_router(self):
+        router1 = self._create_router()
+        self._bind_router(router1['id'])
+        router2 = self._create_router()
+        self._bind_router(router2['id'])
+        bindings = self.plugin.get_ha_router_port_bindings(
+            self.admin_ctx, [router1['id'], router2['id']])
+        self.plugin.delete_router(self.admin_ctx, router1['id'])
+        self.plugin._set_router_states(
+            self.admin_ctx, bindings, {router1['id']: 'active',
+                                       router2['id']: 'active'})
+        routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
+                                                        self.agent1['host'])
+        self.assertEqual('active', routers[0][constants.HA_ROUTER_STATE_KEY])
+
     def test_exclude_dvr_agents_for_ha_candidates(self):
         """Test dvr agents are not counted in the ha candidates.