]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix l2 pop doesn't propagate ip address updates
authorSylvain Afchain <sylvain.afchain@enovance.com>
Thu, 26 Sep 2013 14:57:58 +0000 (16:57 +0200)
committerSylvain Afchain <sylvain.afchain@enovance.com>
Thu, 26 Sep 2013 03:20:17 +0000 (05:20 +0200)
Propagates ip address changes when an ip address is :
added, removed, or changed.
Add a new rpc call for the updates of forwarding informations.

Fixes: Bug #1234137
Change-Id: Ib5b971bd02f20a0ea73f88ce9685e944226bb5a2

neutron/agent/l2population_rpc.py
neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/ml2/drivers/l2pop/mech_driver.py
neutron/plugins/ml2/drivers/l2pop/rpc.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py
neutron/tests/unit/ml2/drivers/test_l2population.py

index c9d131aaf3df7dc318bb6acda71fbbd9861fbf13..f108c4ebdcd55e66b91a3ec71c0838ff1c1376b4 100644 (file)
@@ -37,6 +37,11 @@ class L2populationRpcCallBackMixin(object):
         if not host or host == cfg.CONF.host:
             self.fdb_remove(context, fdb_entries)
 
+    @log.log
+    def update_fdb_entries(self, context, fdb_entries, host=None):
+        if not host or host == cfg.CONF.host:
+            self.fdb_update(context, fdb_entries)
+
     @abc.abstractmethod
     def fdb_add(self, context, fdb_entries):
         pass
@@ -44,3 +49,7 @@ class L2populationRpcCallBackMixin(object):
     @abc.abstractmethod
     def fdb_remove(self, context, fdb_entries):
         pass
+
+    @abc.abstractmethod
+    def fdb_update(self, context, fdb_entries):
+        pass
index 549a08c9e64f2b427ea4511015030a3ff2c0f72d..810b4583db007c4568b442af02aed765fdc066ca 100755 (executable)
@@ -714,6 +714,40 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                                                      ports,
                                                      interface)
 
+    def _fdb_chg_ip(self, context, fdb_entries):
+        LOG.debug(_("update chg_ip received"))
+        for network_id, agent_ports in fdb_entries.items():
+            segment = self.agent.br_mgr.network_map.get(network_id)
+            if not segment:
+                return
+
+            if segment.network_type != lconst.TYPE_VXLAN:
+                return
+
+            interface = self.agent.br_mgr.get_vxlan_device_name(
+                segment.segmentation_id)
+
+            for agent_ip, state in agent_ports.items():
+                if agent_ip == self.agent.br_mgr.local_ip:
+                    continue
+
+                after = state.get('after')
+                for mac, ip in after:
+                    self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface)
+
+                before = state.get('before')
+                for mac, ip in before:
+                    self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface)
+
+    def fdb_update(self, context, fdb_entries):
+        LOG.debug(_("fdb_update received"))
+        for action, values in fdb_entries.items():
+            method = '_fdb_' + action
+            if not hasattr(self, method):
+                raise NotImplementedError()
+
+            getattr(self, method)(context, values)
+
     def create_rpc_dispatcher(self):
         '''Get the rpc dispatcher for this manager.
 
index 8ab92da25f477c68761d7ab4349588e59314648b..a3f54849da08916fe96c7120c272508092ab3bdb 100644 (file)
@@ -36,6 +36,7 @@ class L2populationMechanismDriver(api.MechanismDriver,
 
     def initialize(self):
         LOG.debug(_("Experimental L2 population driver"))
+        self.rpc_ctx = n_context.get_admin_context_without_session()
 
     def _get_port_fdb_entries(self, port):
         return [[port['mac_address'],
@@ -45,31 +46,64 @@ class L2populationMechanismDriver(api.MechanismDriver,
         self.remove_fdb_entries = self._update_port_down(context)
 
     def delete_port_postcommit(self, context):
-        self._notify_remove_fdb_entries(context,
-                                        self.remove_fdb_entries)
-
-    def _notify_remove_fdb_entries(self, context, fdb_entries):
-        rpc_ctx = n_context.get_admin_context_without_session()
         l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
-            rpc_ctx, fdb_entries)
+            self.rpc_ctx, self.remove_fdb_entries)
+
+    def _get_diff_ips(self, orig, port):
+        orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
+        port_ips = set([ip['ip_address'] for ip in port['fixed_ips']])
+
+        # check if an ip has been added or removed
+        orig_chg_ips = orig_ips.difference(port_ips)
+        port_chg_ips = port_ips.difference(orig_ips)
+
+        if orig_chg_ips or port_chg_ips:
+            return orig_chg_ips, port_chg_ips
+
+    def _fixed_ips_changed(self, context, orig, port):
+        diff_ips = self._get_diff_ips(orig, port)
+        if not diff_ips:
+            return
+        orig_ips, port_ips = diff_ips
+
+        port_infos = self._get_port_infos(context, orig)
+        if not port_infos:
+            return
+        agent, agent_ip, segment, port_fdb_entries = port_infos
+
+        orig_mac_ip = [[port['mac_address'], ip] for ip in orig_ips]
+        port_mac_ip = [[port['mac_address'], ip] for ip in port_ips]
+
+        upd_fdb_entries = {port['network_id']: {agent_ip: {}}}
+
+        ports = upd_fdb_entries[port['network_id']][agent_ip]
+        if orig_mac_ip:
+            ports['before'] = orig_mac_ip
+
+        if port_mac_ip:
+            ports['after'] = port_mac_ip
+
+        l2pop_rpc.L2populationAgentNotify.update_fdb_entries(
+            self.rpc_ctx, {'chg_ip': upd_fdb_entries})
+
+        return True
 
     def update_port_postcommit(self, context):
         port = context.current
         orig = context.original
 
         if port['status'] == orig['status']:
-            return
+            self._fixed_ips_changed(context, orig, port)
 
         if port['status'] == const.PORT_STATUS_ACTIVE:
             self._update_port_up(context)
         elif port['status'] == const.PORT_STATUS_DOWN:
             fdb_entries = self._update_port_down(context)
-            self._notify_remove_fdb_entries(context, fdb_entries)
+            l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
+                self.rpc_ctx, fdb_entries)
 
-    def _update_port_up(self, context):
-        port_context = context.current
-        network_id = port_context['network_id']
-        agent_host = port_context['binding:host_id']
+    def _get_port_infos(self, context, port):
+        agent_host = port['binding:host_id']
         if not agent_host:
             return
 
@@ -80,26 +114,39 @@ class L2populationMechanismDriver(api.MechanismDriver,
 
         agent_ip = self.get_agent_ip(agent)
         if not agent_ip:
-            LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"),
-                        agent_host)
+            LOG.warning(_("Unable to retrieve the agent ip, check the agent "
+                          "configuration."))
             return
 
         segment = context.bound_segment
         if not segment:
             LOG.warning(_("Port %(port)s updated by agent %(agent)s "
                           "isn't bound to any segment"),
-                        {'port': port_context['id'], 'agent': agent.host})
+                        {'port': port['id'], 'agent': agent})
             return
 
         tunnel_types = self.get_agent_tunnel_types(agent)
         if segment['network_type'] not in tunnel_types:
             return
 
+        fdb_entries = self._get_port_fdb_entries(port)
+
+        return agent, agent_ip, segment, fdb_entries
+
+    def _update_port_up(self, context):
+        port_context = context.current
+        port_infos = self._get_port_infos(context, port_context)
+        if not port_infos:
+            return
+        agent, agent_ip, segment, port_fdb_entries = port_infos
+
+        agent_host = port_context['binding:host_id']
+        network_id = port_context['network_id']
+
+        session = db_api.get_session()
         agent_ports = self.get_agent_network_port_count(session, agent_host,
                                                         network_id)
 
-        rpc_ctx = n_context.get_admin_context_without_session()
-
         other_fdb_entries = {network_id:
                              {'segment_id': segment['segmentation_id'],
                               'network_type': segment['network_type'],
@@ -138,45 +185,25 @@ class L2populationMechanismDriver(api.MechanismDriver,
 
             if ports.keys():
                 l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
-                    rpc_ctx, agent_fdb_entries, agent_host)
+                    self.rpc_ctx, agent_fdb_entries, agent_host)
 
         # Notify other agents to add fdb rule for current port
-        fdb_entries = self._get_port_fdb_entries(port_context)
-        other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
+        other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
 
-        l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx,
+        l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
                                                           other_fdb_entries)
 
     def _update_port_down(self, context):
         port_context = context.current
-        network_id = port_context['network_id']
+        port_infos = self._get_port_infos(context, port_context)
+        if not port_infos:
+            return
+        agent, agent_ip, segment, port_fdb_entries = port_infos
 
         agent_host = port_context['binding:host_id']
-        if not agent_host:
-            return
+        network_id = port_context['network_id']
 
         session = db_api.get_session()
-        agent = self.get_agent_by_host(session, agent_host)
-        if not agent:
-            return
-
-        agent_ip = self.get_agent_ip(agent)
-        if not agent_ip:
-            LOG.warning(_("Unable to retrieve the agent ip, check the agent "
-                          "configuration."))
-            return
-
-        segment = context.bound_segment
-        if not segment:
-            LOG.warning(_("Port %(port)s updated by agent %(agent)s "
-                          "isn't bound to any segment"),
-                        {'port': port_context['id'], 'agent': agent})
-            return
-
-        tunnel_types = self.get_agent_tunnel_types(agent)
-        if segment['network_type'] not in tunnel_types:
-            return
-
         agent_ports = self.get_agent_network_port_count(session, agent_host,
                                                         network_id)
 
index 176f1182059e839f7b46d58a45dc1cc0aa5aa439..b29717165e258029f5a4fa8a01fe9b5a8887e95e 100644 (file)
@@ -76,4 +76,13 @@ class L2populationAgentNotifyAPI(proxy.RpcProxy):
                 self._notification_fanout(context, 'remove_fdb_entries',
                                           fdb_entries)
 
+    def update_fdb_entries(self, context, fdb_entries, host=None):
+        if fdb_entries:
+            if host:
+                self._notification_host(context, 'update_fdb_entries',
+                                        fdb_entries, host)
+            else:
+                self._notification_fanout(context, 'update_fdb_entries',
+                                          fdb_entries)
+
 L2populationAgentNotify = L2populationAgentNotifyAPI()
index eefe384367fda0000ddc115b872292b30b452a64..0a019c30e04af71883929bc5d23c1d9cc9aa371d 100644 (file)
@@ -412,6 +412,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                                      dl_vlan=lvm.vlan,
                                      dl_dst=port_info[0])
 
+    def fdb_update(self, context, fdb_entries):
+        LOG.debug(_("fdb_update received"))
+        for action, values in fdb_entries.items():
+            method = '_fdb_' + action
+            if not hasattr(self, method):
+                raise NotImplementedError()
+
+            getattr(self, method)(context, values)
+
     def create_rpc_dispatcher(self):
         '''Get the rpc dispatcher for this manager.
 
index fd40f74d2112e180be11d1d4f7e517704df43d93..27fe6a659405b25dfbcf875ac20aa83f77aa0705 100644 (file)
@@ -878,3 +878,26 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
                           check_exit_code=False),
             ]
             execute_fn.assert_has_calls(expected)
+
+    def test_fdb_update_chg_ip(self):
+        fdb_entries = {'chg_ip':
+                       {'net_id':
+                        {'agent_ip':
+                         {'before': [['port_mac', 'port_ip_1']],
+                          'after': [['port_mac', 'port_ip_2']]}}}}
+
+        with mock.patch.object(utils, 'execute',
+                               return_value='') as execute_fn:
+            self.lb_rpc.fdb_update(None, fdb_entries)
+
+            expected = [
+                mock.call(['ip', 'neigh', 'add', 'port_ip_2', 'lladdr',
+                           'port_mac', 'dev', 'vxlan-1', 'nud', 'permanent'],
+                          root_helper=self.root_helper,
+                          check_exit_code=False),
+                mock.call(['ip', 'neigh', 'del', 'port_ip_1', 'lladdr',
+                           'port_mac', 'dev', 'vxlan-1'],
+                          root_helper=self.root_helper,
+                          check_exit_code=False)
+            ]
+            execute_fn.assert_has_calls(expected)
index 718926a6e58e0ad9236c7f7ece09e2bd9e7d4cf9..57a0a2bbe20abd6093c5a88a2bd264813d17173c 100644 (file)
@@ -406,3 +406,80 @@ class TestL2PopulationRpcTestCase(test_plugin.NeutronDbPluginV2TestCase):
 
             self.mock_fanout.assert_any_call(
                 mock.ANY, expected, topic=self.fanout_topic)
+
+    def test_fixed_ips_changed(self):
+        self._register_ml2_agents()
+
+        with self.subnet(network=self._network) as subnet:
+            host_arg = {portbindings.HOST_ID: HOST}
+            with self.port(subnet=subnet, cidr='10.0.0.0/24',
+                           arg_list=(portbindings.HOST_ID,),
+                           **host_arg) as port1:
+                p1 = port1['port']
+
+                self.mock_fanout.reset_mock()
+
+                data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
+                                               {'ip_address': '10.0.0.10'}]}}
+                req = self.new_update_request('ports', data, p1['id'])
+                res = self.deserialize(self.fmt, req.get_response(self.api))
+                ips = res['port']['fixed_ips']
+                self.assertEqual(len(ips), 2)
+
+                add_expected = {'args':
+                                {'fdb_entries':
+                                 {'chg_ip':
+                                  {p1['network_id']:
+                                   {'20.0.0.1':
+                                    {'after': [[p1['mac_address'],
+                                                '10.0.0.10']]}}}}},
+                                'namespace': None,
+                                'method': 'update_fdb_entries'}
+
+                self.mock_fanout.assert_any_call(
+                    mock.ANY, add_expected, topic=self.fanout_topic)
+
+                self.mock_fanout.reset_mock()
+
+                data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
+                                               {'ip_address': '10.0.0.16'}]}}
+                req = self.new_update_request('ports', data, p1['id'])
+                res = self.deserialize(self.fmt, req.get_response(self.api))
+                ips = res['port']['fixed_ips']
+                self.assertEqual(len(ips), 2)
+
+                upd_expected = {'args':
+                                {'fdb_entries':
+                                 {'chg_ip':
+                                  {p1['network_id']:
+                                   {'20.0.0.1':
+                                    {'before': [[p1['mac_address'],
+                                                 '10.0.0.10']],
+                                     'after': [[p1['mac_address'],
+                                                '10.0.0.16']]}}}}},
+                                'namespace': None,
+                                'method': 'update_fdb_entries'}
+
+                self.mock_fanout.assert_any_call(
+                    mock.ANY, upd_expected, topic=self.fanout_topic)
+
+                self.mock_fanout.reset_mock()
+
+                data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.16'}]}}
+                req = self.new_update_request('ports', data, p1['id'])
+                res = self.deserialize(self.fmt, req.get_response(self.api))
+                ips = res['port']['fixed_ips']
+                self.assertEqual(len(ips), 1)
+
+                del_expected = {'args':
+                                {'fdb_entries':
+                                 {'chg_ip':
+                                  {p1['network_id']:
+                                   {'20.0.0.1':
+                                    {'before': [[p1['mac_address'],
+                                                 '10.0.0.2']]}}}}},
+                                'namespace': None,
+                                'method': 'update_fdb_entries'}
+
+                self.mock_fanout.assert_any_call(
+                    mock.ANY, del_expected, topic=self.fanout_topic)