]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Break coupling between ML2 and L3 during create/update operations
authorarmando-migliaccio <armamig@gmail.com>
Thu, 5 Feb 2015 16:40:04 +0000 (08:40 -0800)
committerarmando-migliaccio <armamig@gmail.com>
Fri, 13 Mar 2015 01:57:49 +0000 (18:57 -0700)
This is an initial patch in a series that, by using an event framework,
cleans up the relationship between ML2 and L3, so that they are no longer
tightly coupled. A follow-up will take address the coupling during the
port delete operation.

The newly introduced notification hooks not only benefit the L3 service
plugin, but any other plugin that has an interest in knowing about port
events.

Long term, the notification bits can move in a more 'common' place so that
other plugins can take advantage of them, but as mentioned in a parent patch,
the perestroika is not quite there yet.

Related-blueprint: services-split
Related-blueprint: plugin-interface-perestroika

Change-Id: I6527b1cb53a71a1f68329a7a3b1878094558f8c2

neutron/db/l3_dvrscheduler_db.py
neutron/plugins/ml2/plugin.py
neutron/plugins/ml2/rpc.py
neutron/services/l3_router/l3_router_plugin.py
neutron/tests/unit/ml2/test_ml2_plugin.py
neutron/tests/unit/ml2/test_rpcapi.py

index e367381be5326b46193760139906b13002fd28d5..e67c97f631b43455f4274ca6a39f5e310bbb0816 100644 (file)
@@ -22,6 +22,9 @@ from sqlalchemy import orm
 from sqlalchemy.orm import exc
 from sqlalchemy.orm import joinedload
 
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
 from neutron.common import constants as q_const
 from neutron.common import utils as n_utils
 from neutron.db import agents_db
@@ -29,6 +32,8 @@ from neutron.db import l3_agentschedulers_db as l3agent_sch_db
 from neutron.db import model_base
 from neutron.db import models_v2
 from neutron.i18n import _LI, _LW
+from neutron import manager
+from neutron.plugins.common import constants as service_constants
 from neutron.plugins.ml2 import db as ml2_db
 
 LOG = logging.getLogger(__name__)
@@ -309,3 +314,27 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
             self.bind_dvr_router_servicenode(
                 context, router_id, chosen_agent)
             return chosen_agent
+
+
+def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
+    LOG.debug('Received %s %s' % (resource, event))
+    port = kwargs['port']
+    if not port:
+        return
+
+    l3plugin = manager.NeutronManager.get_service_plugins().get(
+        service_constants.L3_ROUTER_NAT)
+    mac_address_updated = kwargs.get('mac_address_updated')
+    update_device_up = kwargs.get('update_device_up')
+    context = kwargs['context']
+    if mac_address_updated or update_device_up:
+        l3plugin.dvr_vmarp_table_update(context, port, "add")
+    if n_utils.is_dvr_serviced(port['device_owner']):
+        l3plugin.dvr_update_router_addvm(context, port)
+
+
+def subscribe():
+    registry.subscribe(
+        _notify_l3_agent_new_port, resources.PORT, events.AFTER_UPDATE)
+    registry.subscribe(
+        _notify_l3_agent_new_port, resources.PORT, events.AFTER_CREATE)
index 80c0f053a43796dbc6c67069ff4428a9d687c676..99679c440fe38c5053454fe7baf9f8229186aa37 100644 (file)
@@ -33,6 +33,9 @@ from neutron.api.rpc.handlers import dvr_rpc
 from neutron.api.rpc.handlers import metadata_rpc
 from neutron.api.rpc.handlers import securitygroups_rpc
 from neutron.api.v2 import attributes
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
 from neutron.common import constants as const
 from neutron.common import exceptions as exc
 from neutron.common import ipv6_utils
@@ -161,20 +164,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                 if self.type_manager.network_matches_filters(network, filters)
                 ]
 
-    def _notify_l3_agent_new_port(self, context, port):
-        if not port:
-            return
-
-        # Whenever a DVR serviceable port comes up on a
-        # node, it has to be communicated to the L3 Plugin
-        # and agent for creating the respective namespaces.
-        if (utils.is_dvr_serviced(port['device_owner'])):
-            l3plugin = manager.NeutronManager.get_service_plugins().get(
-                service_constants.L3_ROUTER_NAT)
-            if (utils.is_extension_supported(
-                l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
-                l3plugin.dvr_update_router_addvm(context, port)
-
     def _get_host_port_if_changed(self, mech_context, attrs):
         binding = mech_context._binding
         host = attrs and attrs.get(portbindings.HOST_ID)
@@ -936,7 +925,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         attrs = port[attributes.PORT]
         result, mech_context = self._create_port_db(context, port)
         new_host_port = self._get_host_port_if_changed(mech_context, attrs)
-        self._notify_l3_agent_new_port(context, new_host_port)
+        # notify any plugin that is interested in port create events
+        kwargs = {'context': context, 'port': new_host_port}
+        registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
 
         try:
             self.mechanism_manager.create_port_postcommit(mech_context)
@@ -972,7 +963,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             if attrs and attrs.get(portbindings.HOST_ID):
                 new_host_port = self._get_host_port_if_changed(
                     obj['mech_context'], attrs)
-                self._notify_l3_agent_new_port(context, new_host_port)
+                kwargs = {'context': context, 'port': new_host_port}
+                registry.notify(
+                    resources.PORT, events.AFTER_CREATE, self, **kwargs)
 
         try:
             for obj in objects:
@@ -990,11 +983,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
     def update_port(self, context, id, port):
         attrs = port[attributes.PORT]
         need_port_update_notify = False
-        l3plugin = manager.NeutronManager.get_service_plugins().get(
-            service_constants.L3_ROUTER_NAT)
-        is_dvr_enabled = utils.is_extension_supported(
-            l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
-
         session = context.session
 
         # REVISIT: Serialize this operation with a semaphore to
@@ -1034,10 +1022,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             self.mechanism_manager.update_port_precommit(mech_context)
 
         # Notifications must be sent after the above transaction is complete
-        if mac_address_updated and l3plugin and is_dvr_enabled:
-            # NOTE: "add" actually does a 'replace' operation
-            l3plugin.dvr_vmarp_table_update(context, updated_port, "add")
-        self._notify_l3_agent_new_port(context, new_host_port)
+        kwargs = {
+            'context': context,
+            'port': new_host_port,
+            'mac_address_updated': mac_address_updated,
+        }
+        registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
 
         # TODO(apech) - handle errors raised by update_port, potentially
         # by re-calling update_port with the previous attributes. For
index 8e1a55d9c76e1cdf6f4a0604a04ee5984eefe406..d46e7958915e514151b2706ea2bc3506b17582af 100644 (file)
@@ -19,15 +19,16 @@ from sqlalchemy.orm import exc
 
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api.rpc.handlers import dvr_rpc
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
 from neutron.common import constants as q_const
 from neutron.common import exceptions
 from neutron.common import rpc as n_rpc
 from neutron.common import topics
-from neutron.common import utils
 from neutron.extensions import portbindings
 from neutron.i18n import _LW
 from neutron import manager
-from neutron.plugins.common import constants as service_constants
 from neutron.plugins.ml2 import driver_api as api
 from neutron.plugins.ml2.drivers import type_tunnel
 # REVISIT(kmestery): Allow the type and mechanism drivers to supply the
@@ -167,16 +168,21 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
         port_id = plugin.update_port_status(rpc_context, port_id,
                                             q_const.PORT_STATUS_ACTIVE,
                                             host)
-        l3plugin = manager.NeutronManager.get_service_plugins().get(
-            service_constants.L3_ROUTER_NAT)
-        if (l3plugin and
-            utils.is_extension_supported(l3plugin,
-                                         q_const.L3_DISTRIBUTED_EXT_ALIAS)):
-            try:
-                port = plugin._get_port(rpc_context, port_id)
-                l3plugin.dvr_vmarp_table_update(rpc_context, port, "add")
-            except exceptions.PortNotFound:
-                LOG.debug('Port %s not found during ARP update', port_id)
+        try:
+            # NOTE(armax): it's best to remove all objects from the
+            # session, before we try to retrieve the new port object
+            rpc_context.session.expunge_all()
+            port = plugin._get_port(rpc_context, port_id)
+        except exceptions.PortNotFound:
+            LOG.debug('Port %s not found during update', port_id)
+        else:
+            kwargs = {
+                'context': rpc_context,
+                'port': port,
+                'update_device_up': True
+            }
+            registry.notify(
+                resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
 
 
 class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
index 57faf6028c2a1902a44cbacc055eb1c56768f1bf..85ad1f86b54744932dd883d13ac5bac6f77ca0fa 100644 (file)
@@ -56,6 +56,8 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
             cfg.CONF.router_scheduler_driver)
         self.start_periodic_l3_agent_status_check()
         super(L3RouterPlugin, self).__init__()
+        if 'dvr' in self.supported_extension_aliases:
+            l3_dvrscheduler_db.subscribe()
 
     def setup_rpc(self):
         # RPC support
index 2ae3f7476bf874313d4f10049623c4521043e7e6..3e87842a702888b7f989cb740cfdac29ea4c0827 100644 (file)
@@ -1251,6 +1251,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
     def setUp(self):
         super(TestMl2PluginCreateUpdateDeletePort, self).setUp()
         self.context = mock.MagicMock()
+        self.notify_p = mock.patch('neutron.callbacks.registry.notify')
+        self.notify = self.notify_p.start()
 
     def _ensure_transaction_is_closed(self):
         transaction = self.context.session.begin(subtransactions=True)
@@ -1268,9 +1270,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
             return_value=new_host_port)
         plugin._check_mac_update_allowed = mock.Mock(return_value=True)
 
-        plugin._notify_l3_agent_new_port = mock.Mock()
-        plugin._notify_l3_agent_new_port.side_effect = (
-            lambda c, p: self._ensure_transaction_is_closed())
+        self.notify.side_effect = (
+            lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
 
         return plugin
 
@@ -1286,33 +1287,28 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
 
             plugin.create_port(self.context, mock.MagicMock())
 
-            plugin._notify_l3_agent_new_port.assert_called_once_with(
-                self.context, new_host_port)
+            kwargs = {'context': self.context, 'port': new_host_port}
+            self.notify.assert_called_once_with('port', 'after_create',
+                plugin, **kwargs)
 
     def test_update_port_rpc_outside_transaction(self):
         with contextlib.nested(
             mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
             mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
-            mock.patch.object(manager.NeutronManager, 'get_service_plugins'),
-        ) as (init, super_update_port, get_service_plugins):
+        ) as (init, super_update_port):
             init.return_value = None
-            l3plugin = mock.Mock()
-            l3plugin.supported_extension_aliases = [
-                constants.L3_DISTRIBUTED_EXT_ALIAS,
-            ]
-            get_service_plugins.return_value = {
-                service_constants.L3_ROUTER_NAT: l3plugin,
-            }
-
             new_host_port = mock.Mock()
             plugin = self._create_plugin_for_create_update_port(new_host_port)
 
             plugin.update_port(self.context, 'fake_id', mock.MagicMock())
 
-            plugin._notify_l3_agent_new_port.assert_called_once_with(
-                self.context, new_host_port)
-            l3plugin.dvr_vmarp_table_update.assert_called_once_with(
-                self.context, mock.ANY, "add")
+            kwargs = {
+                'context': self.context,
+                'port': new_host_port,
+                'mac_address_updated': True,
+            }
+            self.notify.assert_called_once_with('port', 'after_update',
+                plugin, **kwargs)
 
     def test_vmarp_table_update_outside_of_delete_transaction(self):
         l3plugin = mock.Mock()
index efb1dbd57847f65747feb108b2df5f239ae0e51d..a22445cafa0b131a39786c02b496b1b83e7093bd 100644 (file)
@@ -44,45 +44,32 @@ class RpcCallbacksTestCase(base.BaseTestCase):
                                                  self.type_manager)
         self.manager = mock.patch.object(
             plugin_rpc.manager, 'NeutronManager').start()
-        self.l3plugin = mock.Mock()
-        self.manager.get_service_plugins.return_value = {
-            'L3_ROUTER_NAT': self.l3plugin
-        }
         self.plugin = self.manager.get_plugin()
 
-    def _test_update_device_up(self, extensions, kwargs):
-        with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
-                        '._device_to_port_id'):
-            type(self.l3plugin).supported_extension_aliases = (
-                mock.PropertyMock(return_value=extensions))
-            self.callbacks.update_device_up(mock.ANY, **kwargs)
-
-    def test_update_device_up_without_dvr(self):
+    def _test_update_device_up(self):
         kwargs = {
             'agent_id': 'foo_agent',
             'device': 'foo_device'
         }
-        self._test_update_device_up(['router'], kwargs)
-        self.assertFalse(self.l3plugin.dvr_vmarp_table_update.call_count)
+        with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
+                        '._device_to_port_id'):
+            with mock.patch('neutron.callbacks.registry.notify') as notify:
+                self.callbacks.update_device_up(mock.Mock(), **kwargs)
+                return notify
 
-    def test_update_device_up_with_dvr(self):
+    def test_update_device_up_notify(self):
+        notify = self._test_update_device_up()
         kwargs = {
-            'agent_id': 'foo_agent',
-            'device': 'foo_device'
+            'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True
         }
-        self._test_update_device_up(['router', 'dvr'], kwargs)
-        self.l3plugin.dvr_vmarp_table_update.assert_called_once_with(
-            mock.ANY, mock.ANY, 'add')
+        notify.assert_called_once_with(
+            'port', 'after_update', self.plugin, **kwargs)
 
-    def test_update_device_up_with_dvr_when_port_not_found(self):
-        kwargs = {
-            'agent_id': 'foo_agent',
-            'device': 'foo_device'
-        }
-        self.l3plugin.dvr_vmarp_table_update.side_effect = (
+    def test_update_device_up_notify_not_sent_with_port_not_found(self):
+        self.plugin._get_port.side_effect = (
             exceptions.PortNotFound(port_id='foo_port_id'))
-        self._test_update_device_up(['router', 'dvr'], kwargs)
-        self.assertTrue(self.l3plugin.dvr_vmarp_table_update.call_count)
+        notify = self._test_update_device_up()
+        self.assertFalse(notify.call_count)
 
     def test_get_device_details_without_port_context(self):
         self.plugin.get_bound_port_context.return_value = None