]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Move postcommit ops out of transaction for bulk
authorMohammad Banikazemi <mb@us.ibm.com>
Sat, 6 Sep 2014 14:24:01 +0000 (10:24 -0400)
committerMohammad Banikazemi <mb@us.ibm.com>
Wed, 7 Jan 2015 18:03:13 +0000 (13:03 -0500)
Currently, the bulk create operations in ML2 are executed in
a transaction. This means all precommit and postcommit operations
for such operations are in a transaction. Postcommit operations
are expected to be executed outside of transactions as they may
communicate with a backend and introduce substantial delays. This
fix removes the postcommit operations from the transaction for
bulk create network/subnet/port operations.

Change-Id: I9a9683058088e50d9443040223232bf5e1396ccf
Closes-Bug: #1193861

neutron/plugins/ml2/drivers/mech_bigswitch/driver.py
neutron/plugins/ml2/managers.py
neutron/plugins/ml2/plugin.py
neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py
neutron/tests/unit/ml2/drivers/mechanism_bulkless.py [deleted file]
neutron/tests/unit/ml2/test_ml2_plugin.py
neutron/tests/unit/test_db_plugin.py
setup.cfg

index 39f6c80a4420e6f94b80b1f0438d6c4e6cdc1add..c3d61fcd75cb8eb5b2aceb7dc11e838503cd6251 100644 (file)
@@ -55,8 +55,6 @@ class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base,
         # register plugin config opts
         pl_config.register_config()
         self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
-        # backend doesn't support bulk operations yet
-        self.native_bulk_support = False
 
         # init network ctrl connections
         self.servers = servermanager.ServerPool()
index 60d87960ea919299e76ad93f1e7a5ff712dac10d..0f6f6e850431e37f5cd7758a5e8369303976f9a4 100644 (file)
@@ -267,13 +267,9 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
                  [driver.name for driver in self.ordered_mech_drivers])
 
     def initialize(self):
-        # For ML2 to support bulk operations, each driver must support them
-        self.native_bulk_support = True
         for driver in self.ordered_mech_drivers:
             LOG.info(_LI("Initializing mechanism driver '%s'"), driver.name)
             driver.obj.initialize()
-            self.native_bulk_support &= getattr(driver.obj,
-                                                'native_bulk_support', True)
 
     def _call_on_drivers(self, method_name, context,
                          continue_on_failure=False):
index 18b044ea429240ed170141595c1ffcd264653972..6dd3625ff731c3390bb5935165940197ef4ed841 100644 (file)
@@ -126,8 +126,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         self.type_manager.initialize()
         self.extension_manager.initialize()
         self.mechanism_manager.initialize()
-        # bulk support depends on the underlying drivers
-        self.__native_bulk_support = self.mechanism_manager.native_bulk_support
 
         self._setup_rpc()
 
@@ -485,10 +483,59 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                                   segment[api.SEGMENTATION_ID],
                                   segment[api.PHYSICAL_NETWORK])
 
-    # TODO(apech): Need to override bulk operations
+    def _delete_objects(self, context, resource, objects):
+        delete_op = getattr(self, 'delete_%s' % resource)
+        for obj in objects:
+            try:
+                delete_op(context, obj['result']['id'])
+            except KeyError:
+                LOG.exception(_LE("Could not find %s to delete."),
+                              resource)
+            except Exception:
+                LOG.exception(_LE("Could not delete %(res)s %(id)s."),
+                              {'res': resource,
+                               'id': obj['result']['id']})
+
+    def _create_bulk_ml2(self, resource, context, request_items):
+        objects = []
+        collection = "%ss" % resource
+        items = request_items[collection]
+        try:
+            with context.session.begin(subtransactions=True):
+                obj_creator = getattr(self, '_create_%s_db' % resource)
+                for item in items:
+                    attrs = item[resource]
+                    result, mech_context = obj_creator(context, item)
+                    objects.append({'mech_context': mech_context,
+                                    'result': result,
+                                    'attributes': attrs})
+
+        except Exception:
+            with excutils.save_and_reraise_exception():
+                LOG.exception(_LE("An exception occurred while creating "
+                                  "the %(resource)s:%(item)s"),
+                              {'resource': resource, 'item': item})
 
-    def create_network(self, context, network):
-        net_data = network['network']
+        try:
+            postcommit_op = getattr(self.mechanism_manager,
+                                    'create_%s_postcommit' % resource)
+            for obj in objects:
+                postcommit_op(obj['mech_context'])
+            return objects
+        except ml2_exc.MechanismDriverError:
+            with excutils.save_and_reraise_exception():
+                resource_ids = [res['result']['id'] for res in objects]
+                LOG.exception(_LE("mechanism_manager.create_%(res)s"
+                                  "_postcommit failed for %(res)s: "
+                                  "'%(failed_id)s'. Deleting "
+                                  "%(res)ss %(resource_ids)s"),
+                              {'res': resource,
+                               'failed_id': obj['result']['id'],
+                               'resource_ids': ', '.join(resource_ids)})
+                self._delete_objects(context, resource, objects)
+
+    def _create_network_db(self, context, network):
+        net_data = network[attributes.NETWORK]
         tenant_id = self._get_tenant_id_for_create(context, net_data)
         session = context.session
         with session.begin(subtransactions=True):
@@ -504,7 +551,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             mech_context = driver_context.NetworkContext(self, context,
                                                          result)
             self.mechanism_manager.create_network_precommit(mech_context)
+        return result, mech_context
 
+    def create_network(self, context, network):
+        result, mech_context = self._create_network_db(context, network)
         try:
             self.mechanism_manager.create_network_postcommit(mech_context)
         except ml2_exc.MechanismDriverError:
@@ -512,8 +562,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                 LOG.error(_LE("mechanism_manager.create_network_postcommit "
                               "failed, deleting network '%s'"), result['id'])
                 self.delete_network(context, result['id'])
+
         return result
 
+    def create_network_bulk(self, context, networks):
+        objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
+        return [obj['result'] for obj in objects]
+
     def update_network(self, context, id, network):
         provider._raise_if_updates_provider_attributes(network['network'])
 
@@ -661,7 +716,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                           " failed"))
         self.notifier.network_delete(context, id)
 
-    def create_subnet(self, context, subnet):
+    def _create_subnet_db(self, context, subnet):
         session = context.session
         with session.begin(subtransactions=True):
             result = super(Ml2Plugin, self).create_subnet(context, subnet)
@@ -670,6 +725,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             mech_context = driver_context.SubnetContext(self, context, result)
             self.mechanism_manager.create_subnet_precommit(mech_context)
 
+        return result, mech_context
+
+    def create_subnet(self, context, subnet):
+        result, mech_context = self._create_subnet_db(context, subnet)
         try:
             self.mechanism_manager.create_subnet_postcommit(mech_context)
         except ml2_exc.MechanismDriverError:
@@ -679,6 +738,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                 self.delete_subnet(context, result['id'])
         return result
 
+    def create_subnet_bulk(self, context, subnets):
+        objects = self._create_bulk_ml2(attributes.SUBNET, context, subnets)
+        return [obj['result'] for obj in objects]
+
     def update_subnet(self, context, id, subnet):
         session = context.session
         with session.begin(subtransactions=True):
@@ -780,8 +843,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             # the fact that an error occurred.
             LOG.error(_LE("mechanism_manager.delete_subnet_postcommit failed"))
 
-    def create_port(self, context, port):
-        attrs = port['port']
+    def _create_port_db(self, context, port):
+        attrs = port[attributes.PORT]
         attrs['status'] = const.PORT_STATUS_DOWN
 
         session = context.session
@@ -796,7 +859,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             binding = db.add_port_binding(session, result['id'])
             mech_context = driver_context.PortContext(self, context, result,
                                                       network, binding)
-            new_host_port = self._get_host_port_if_changed(mech_context, attrs)
+
             self._process_port_binding(mech_context, attrs)
 
             result[addr_pair.ADDRESS_PAIRS] = (
@@ -807,7 +870,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                                                       dhcp_opts)
             self.mechanism_manager.create_port_precommit(mech_context)
 
-        # Notification must be sent after the above transaction is complete
+        return result, mech_context
+
+    def create_port(self, context, port):
+        attrs = port['port']
+        result, mech_context = self._create_port_db(context, port)
+        new_host_port = self._get_host_port_if_changed(mech_context, attrs)
         self._notify_l3_agent_new_port(context, new_host_port)
 
         try:
@@ -831,6 +899,35 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                 self.delete_port(context, result['id'])
         return bound_context._port
 
+    def create_port_bulk(self, context, ports):
+        objects = self._create_bulk_ml2(attributes.PORT, context, ports)
+
+        for obj in objects:
+            # REVISIT(rkukura): Is there any point in calling this before
+            # a binding has been successfully established?
+            # TODO(banix): Use a single notification for all objects
+            self.notify_security_groups_member_updated(context,
+                                                       obj['result'])
+
+            attrs = obj['attributes']
+            if attrs and attrs.get(portbindings.HOST_ID):
+                new_host_port = self._get_host_port_if_changed(
+                    obj['mech_context'], attrs)
+                self._notify_l3_agent_new_port(context, new_host_port)
+
+        try:
+            for obj in objects:
+                obj['bound_context'] = self._bind_port_if_needed(
+                    obj['mech_context'])
+            return [obj['bound_context']._port for obj in objects]
+        except ml2_exc.MechanismDriverError:
+            with excutils.save_and_reraise_exception():
+                resource_ids = [res['result']['id'] for res in objects]
+                LOG.error(_LE("_bind_port_if_needed failed. "
+                              "Deleting all ports from create bulk '%s'"),
+                          resource_ids)
+                self._delete_objects(context, 'port', objects)
+
     def update_port(self, context, id, port):
         attrs = port['port']
         need_port_update_notify = False
index e349a612e72330fd6130b4e5ae8500250d139636..e1f6388ff508c6a1dbef42c80e286ff8a8120120 100644 (file)
@@ -309,7 +309,7 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
             plugin_obj = manager.NeutronManager.get_plugin()
             orig = plugin_obj.create_port
             with mock.patch.object(plugin_obj,
-                                   'create_port') as patched_plugin:
+                                   '_create_port_db') as patched_plugin:
 
                 def side_effect(*args, **kwargs):
                     return self._fail_second_call(patched_plugin, orig,
@@ -343,7 +343,7 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
             plugin_obj = manager.NeutronManager.get_plugin()
             orig = plugin_obj.create_port
             with mock.patch.object(plugin_obj,
-                                   'create_port') as patched_plugin:
+                                   '_create_port_db') as patched_plugin:
 
                 def side_effect(*args, **kwargs):
                     return self._fail_second_call(patched_plugin, orig,
@@ -718,7 +718,7 @@ class TestCiscoNetworksV2(CiscoML2MechanismTestCase,
         with mock.patch('__builtin__.hasattr',
                         new=fakehasattr):
             with mock.patch.object(plugin_obj,
-                                   'create_network') as patched_plugin:
+                                   '_create_network_db') as patched_plugin:
                 def side_effect(*args, **kwargs):
                     return self._fail_second_call(patched_plugin, orig,
                                                   *args, **kwargs)
@@ -737,7 +737,7 @@ class TestCiscoNetworksV2(CiscoML2MechanismTestCase,
         plugin_obj = manager.NeutronManager.get_plugin()
         orig = plugin_obj.create_network
         with mock.patch.object(plugin_obj,
-                               'create_network') as patched_plugin:
+                               '_create_network_db') as patched_plugin:
 
             def side_effect(*args, **kwargs):
                 return self._fail_second_call(patched_plugin, orig,
@@ -769,7 +769,7 @@ class TestCiscoSubnetsV2(CiscoML2MechanismTestCase,
             plugin_obj = manager.NeutronManager.get_plugin()
             orig = plugin_obj.create_subnet
             with mock.patch.object(plugin_obj,
-                                   'create_subnet') as patched_plugin:
+                                   '_create_subnet_db') as patched_plugin:
 
                 def side_effect(*args, **kwargs):
                     self._fail_second_call(patched_plugin, orig,
@@ -792,7 +792,7 @@ class TestCiscoSubnetsV2(CiscoML2MechanismTestCase,
         plugin_obj = manager.NeutronManager.get_plugin()
         orig = plugin_obj.create_subnet
         with mock.patch.object(plugin_obj,
-                               'create_subnet') as patched_plugin:
+                               '_create_subnet_db') as patched_plugin:
             def side_effect(*args, **kwargs):
                 return self._fail_second_call(patched_plugin, orig,
                                               *args, **kwargs)
diff --git a/neutron/tests/unit/ml2/drivers/mechanism_bulkless.py b/neutron/tests/unit/ml2/drivers/mechanism_bulkless.py
deleted file mode 100644 (file)
index 0a0d3de..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright (c) 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-from neutron.plugins.ml2 import driver_api as api
-
-
-class BulklessMechanismDriver(api.MechanismDriver):
-    """Test mechanism driver for testing bulk emulation."""
-
-    def initialize(self):
-        self.native_bulk_support = False
index 4bf311598109b9106e6785f6588b99d81047aa30..37495dc31c24857e565624bf1939e8d668c14a0c 100644 (file)
@@ -90,14 +90,6 @@ class Ml2PluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
         self.context = context.get_admin_context()
 
 
-class TestMl2BulkToggleWithBulkless(Ml2PluginV2TestCase):
-
-    _mechanism_drivers = ['logger', 'test', 'bulkless']
-
-    def test_bulk_disable_with_bulkless_driver(self):
-        self.assertTrue(self._skip_native_bulk)
-
-
 class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
 
     _mechanism_drivers = ['logger', 'test']
@@ -166,6 +158,23 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
         flips = l3plugin.get_floatingips(context.get_admin_context())
         self.assertFalse(flips)
 
+    def test_create_ports_bulk_port_binding_failure(self):
+        ctx = context.get_admin_context()
+        with self.network() as net:
+            plugin = manager.NeutronManager.get_plugin()
+
+            with mock.patch.object(plugin, '_bind_port_if_needed',
+                side_effect=ml2_exc.MechanismDriverError(
+                    method='create_port_bulk')) as _bind_port_if_needed:
+
+                res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
+                                             'test', True, context=ctx)
+
+                self.assertTrue(_bind_port_if_needed.called)
+                # We expect a 500 as we injected a fault in the plugin
+                self._validate_behavior_on_bulk_failure(
+                    res, 'ports', webob.exc.HTTPServerError.code)
+
     def test_delete_port_no_notify_in_disassociate_floatingips(self):
         ctx = context.get_admin_context()
         plugin = manager.NeutronManager.get_plugin()
index 6d2dd8c7524fb4be53e6ce1120f3bb1110a9201f..278de555330d6932bb0538dd84a0ade19225f3c6 100644 (file)
@@ -63,6 +63,17 @@ def _fake_get_sorting_helper(self, request):
     return api_common.SortingEmulatedHelper(request, self._attr_info)
 
 
+# TODO(banix): Move the following method to ML2 db test module when ML2
+# mechanism driver unit tests are corrected to use Ml2PluginV2TestCase
+# instead of directly using NeutronDbPluginV2TestCase
+def _get_create_db_method(resource):
+    ml2_method = '_create_%s_db' % resource
+    if hasattr(manager.NeutronManager.get_plugin(), ml2_method):
+        return ml2_method
+    else:
+        return 'create_%s' % resource
+
+
 class NeutronDbPluginV2TestCase(testlib_api.WebTestCase,
                                 testlib_plugin.PluginSetupHelper):
     fmt = 'json'
@@ -883,8 +894,9 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
         with mock.patch('__builtin__.hasattr',
                         new=fakehasattr):
             orig = manager.NeutronManager.get_plugin().create_port
+            method_to_patch = _get_create_db_method('port')
             with mock.patch.object(manager.NeutronManager.get_plugin(),
-                                   'create_port') as patched_plugin:
+                                   method_to_patch) as patched_plugin:
 
                 def side_effect(*args, **kwargs):
                     return self._fail_second_call(patched_plugin, orig,
@@ -908,7 +920,8 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
         with self.network() as net:
             plugin = manager.NeutronManager.get_plugin()
             orig = plugin.create_port
-            with mock.patch.object(plugin, 'create_port') as patched_plugin:
+            method_to_patch = _get_create_db_method('port')
+            with mock.patch.object(plugin, method_to_patch) as patched_plugin:
 
                 def side_effect(*args, **kwargs):
                     return self._fail_second_call(patched_plugin, orig,
@@ -2073,8 +2086,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
         #ensures the API choose the emulation code path
         with mock.patch('__builtin__.hasattr',
                         new=fakehasattr):
+            method_to_patch = _get_create_db_method('network')
             with mock.patch.object(manager.NeutronManager.get_plugin(),
-                                   'create_network') as patched_plugin:
+                                   method_to_patch) as patched_plugin:
 
                 def side_effect(*args, **kwargs):
                     return self._fail_second_call(patched_plugin, orig,
@@ -2091,8 +2105,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
         if self._skip_native_bulk:
             self.skipTest("Plugin does not support native bulk network create")
         orig = manager.NeutronManager.get_plugin().create_network
+        method_to_patch = _get_create_db_method('network')
         with mock.patch.object(manager.NeutronManager.get_plugin(),
-                               'create_network') as patched_plugin:
+                               method_to_patch) as patched_plugin:
 
             def side_effect(*args, **kwargs):
                 return self._fail_second_call(patched_plugin, orig,
@@ -2513,8 +2528,9 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
         with mock.patch('__builtin__.hasattr',
                         new=fakehasattr):
             orig = manager.NeutronManager.get_plugin().create_subnet
+            method_to_patch = _get_create_db_method('subnet')
             with mock.patch.object(manager.NeutronManager.get_plugin(),
-                                   'create_subnet') as patched_plugin:
+                                   method_to_patch) as patched_plugin:
 
                 def side_effect(*args, **kwargs):
                     self._fail_second_call(patched_plugin, orig,
@@ -2536,7 +2552,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
             self.skipTest("Plugin does not support native bulk subnet create")
         plugin = manager.NeutronManager.get_plugin()
         orig = plugin.create_subnet
-        with mock.patch.object(plugin, 'create_subnet') as patched_plugin:
+        method_to_patch = _get_create_db_method('subnet')
+        with mock.patch.object(plugin, method_to_patch) as patched_plugin:
             def side_effect(*args, **kwargs):
                 return self._fail_second_call(patched_plugin, orig,
                                               *args, **kwargs)
index dc5b1a4502b499474fbf1c681d45e7db0757d146..75cf0fde455794cb00d98ce62ef6de73acd9a60c 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -173,7 +173,6 @@ neutron.ml2.mechanism_drivers =
     opendaylight = neutron.plugins.ml2.drivers.mechanism_odl:OpenDaylightMechanismDriver
     logger = neutron.tests.unit.ml2.drivers.mechanism_logger:LoggerMechanismDriver
     test = neutron.tests.unit.ml2.drivers.mechanism_test:TestMechanismDriver
-    bulkless = neutron.tests.unit.ml2.drivers.mechanism_bulkless:BulklessMechanismDriver
     linuxbridge = neutron.plugins.ml2.drivers.mech_linuxbridge:LinuxbridgeMechanismDriver
     openvswitch = neutron.plugins.ml2.drivers.mech_openvswitch:OpenvswitchMechanismDriver
     hyperv = neutron.plugins.ml2.drivers.mech_hyperv:HypervMechanismDriver