From: Mohammad Banikazemi Date: Sat, 6 Sep 2014 14:24:01 +0000 (-0400) Subject: Move postcommit ops out of transaction for bulk X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=2ee08c3464c53abaf9bc5493132ad7958611e3b8;p=openstack-build%2Fneutron-build.git Move postcommit ops out of transaction for bulk Currently, the bulk create operations in ML2 are executed in a transaction. This means all precommit and postcommit operations for such operations are in a transaction. Postcommit operations are expected to be executed outside of transactions as they may communicate with a backend and introduce substantial delays. This fix removes the postcommit operations from the transaction for bulk create network/subnet/port operations. Change-Id: I9a9683058088e50d9443040223232bf5e1396ccf Closes-Bug: #1193861 --- diff --git a/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py b/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py index 39f6c80a4..c3d61fcd7 100644 --- a/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py +++ b/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py @@ -55,8 +55,6 @@ class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base, # register plugin config opts pl_config.register_config() self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size) - # backend doesn't support bulk operations yet - self.native_bulk_support = False # init network ctrl connections self.servers = servermanager.ServerPool() diff --git a/neutron/plugins/ml2/managers.py b/neutron/plugins/ml2/managers.py index 60d87960e..0f6f6e850 100644 --- a/neutron/plugins/ml2/managers.py +++ b/neutron/plugins/ml2/managers.py @@ -267,13 +267,9 @@ class MechanismManager(stevedore.named.NamedExtensionManager): [driver.name for driver in self.ordered_mech_drivers]) def initialize(self): - # For ML2 to support bulk operations, each driver must support them - self.native_bulk_support = True for driver in self.ordered_mech_drivers: LOG.info(_LI("Initializing mechanism driver '%s'"), driver.name) driver.obj.initialize() - self.native_bulk_support &= getattr(driver.obj, - 'native_bulk_support', True) def _call_on_drivers(self, method_name, context, continue_on_failure=False): diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 18b044ea4..6dd3625ff 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -126,8 +126,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.type_manager.initialize() self.extension_manager.initialize() self.mechanism_manager.initialize() - # bulk support depends on the underlying drivers - self.__native_bulk_support = self.mechanism_manager.native_bulk_support self._setup_rpc() @@ -485,10 +483,59 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, segment[api.SEGMENTATION_ID], segment[api.PHYSICAL_NETWORK]) - # TODO(apech): Need to override bulk operations + def _delete_objects(self, context, resource, objects): + delete_op = getattr(self, 'delete_%s' % resource) + for obj in objects: + try: + delete_op(context, obj['result']['id']) + except KeyError: + LOG.exception(_LE("Could not find %s to delete."), + resource) + except Exception: + LOG.exception(_LE("Could not delete %(res)s %(id)s."), + {'res': resource, + 'id': obj['result']['id']}) + + def _create_bulk_ml2(self, resource, context, request_items): + objects = [] + collection = "%ss" % resource + items = request_items[collection] + try: + with context.session.begin(subtransactions=True): + obj_creator = getattr(self, '_create_%s_db' % resource) + for item in items: + attrs = item[resource] + result, mech_context = obj_creator(context, item) + objects.append({'mech_context': mech_context, + 'result': result, + 'attributes': attrs}) + + except Exception: + with excutils.save_and_reraise_exception(): + LOG.exception(_LE("An exception occurred while creating " + "the %(resource)s:%(item)s"), + {'resource': resource, 'item': item}) - def create_network(self, context, network): - net_data = network['network'] + try: + postcommit_op = getattr(self.mechanism_manager, + 'create_%s_postcommit' % resource) + for obj in objects: + postcommit_op(obj['mech_context']) + return objects + except ml2_exc.MechanismDriverError: + with excutils.save_and_reraise_exception(): + resource_ids = [res['result']['id'] for res in objects] + LOG.exception(_LE("mechanism_manager.create_%(res)s" + "_postcommit failed for %(res)s: " + "'%(failed_id)s'. Deleting " + "%(res)ss %(resource_ids)s"), + {'res': resource, + 'failed_id': obj['result']['id'], + 'resource_ids': ', '.join(resource_ids)}) + self._delete_objects(context, resource, objects) + + def _create_network_db(self, context, network): + net_data = network[attributes.NETWORK] tenant_id = self._get_tenant_id_for_create(context, net_data) session = context.session with session.begin(subtransactions=True): @@ -504,7 +551,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, mech_context = driver_context.NetworkContext(self, context, result) self.mechanism_manager.create_network_precommit(mech_context) + return result, mech_context + def create_network(self, context, network): + result, mech_context = self._create_network_db(context, network) try: self.mechanism_manager.create_network_postcommit(mech_context) except ml2_exc.MechanismDriverError: @@ -512,8 +562,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.error(_LE("mechanism_manager.create_network_postcommit " "failed, deleting network '%s'"), result['id']) self.delete_network(context, result['id']) + return result + def create_network_bulk(self, context, networks): + objects = self._create_bulk_ml2(attributes.NETWORK, context, networks) + return [obj['result'] for obj in objects] + def update_network(self, context, id, network): provider._raise_if_updates_provider_attributes(network['network']) @@ -661,7 +716,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, " failed")) self.notifier.network_delete(context, id) - def create_subnet(self, context, subnet): + def _create_subnet_db(self, context, subnet): session = context.session with session.begin(subtransactions=True): result = super(Ml2Plugin, self).create_subnet(context, subnet) @@ -670,6 +725,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, mech_context = driver_context.SubnetContext(self, context, result) self.mechanism_manager.create_subnet_precommit(mech_context) + return result, mech_context + + def create_subnet(self, context, subnet): + result, mech_context = self._create_subnet_db(context, subnet) try: self.mechanism_manager.create_subnet_postcommit(mech_context) except ml2_exc.MechanismDriverError: @@ -679,6 +738,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.delete_subnet(context, result['id']) return result + def create_subnet_bulk(self, context, subnets): + objects = self._create_bulk_ml2(attributes.SUBNET, context, subnets) + return [obj['result'] for obj in objects] + def update_subnet(self, context, id, subnet): session = context.session with session.begin(subtransactions=True): @@ -780,8 +843,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # the fact that an error occurred. LOG.error(_LE("mechanism_manager.delete_subnet_postcommit failed")) - def create_port(self, context, port): - attrs = port['port'] + def _create_port_db(self, context, port): + attrs = port[attributes.PORT] attrs['status'] = const.PORT_STATUS_DOWN session = context.session @@ -796,7 +859,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, binding = db.add_port_binding(session, result['id']) mech_context = driver_context.PortContext(self, context, result, network, binding) - new_host_port = self._get_host_port_if_changed(mech_context, attrs) + self._process_port_binding(mech_context, attrs) result[addr_pair.ADDRESS_PAIRS] = ( @@ -807,7 +870,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, dhcp_opts) self.mechanism_manager.create_port_precommit(mech_context) - # Notification must be sent after the above transaction is complete + return result, mech_context + + def create_port(self, context, port): + attrs = port['port'] + result, mech_context = self._create_port_db(context, port) + new_host_port = self._get_host_port_if_changed(mech_context, attrs) self._notify_l3_agent_new_port(context, new_host_port) try: @@ -831,6 +899,35 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.delete_port(context, result['id']) return bound_context._port + def create_port_bulk(self, context, ports): + objects = self._create_bulk_ml2(attributes.PORT, context, ports) + + for obj in objects: + # REVISIT(rkukura): Is there any point in calling this before + # a binding has been successfully established? + # TODO(banix): Use a single notification for all objects + self.notify_security_groups_member_updated(context, + obj['result']) + + attrs = obj['attributes'] + if attrs and attrs.get(portbindings.HOST_ID): + new_host_port = self._get_host_port_if_changed( + obj['mech_context'], attrs) + self._notify_l3_agent_new_port(context, new_host_port) + + try: + for obj in objects: + obj['bound_context'] = self._bind_port_if_needed( + obj['mech_context']) + return [obj['bound_context']._port for obj in objects] + except ml2_exc.MechanismDriverError: + with excutils.save_and_reraise_exception(): + resource_ids = [res['result']['id'] for res in objects] + LOG.error(_LE("_bind_port_if_needed failed. " + "Deleting all ports from create bulk '%s'"), + resource_ids) + self._delete_objects(context, 'port', objects) + def update_port(self, context, id, port): attrs = port['port'] need_port_update_notify = False diff --git a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py index e349a612e..e1f6388ff 100644 --- a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py +++ b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py @@ -309,7 +309,7 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, plugin_obj = manager.NeutronManager.get_plugin() orig = plugin_obj.create_port with mock.patch.object(plugin_obj, - 'create_port') as patched_plugin: + '_create_port_db') as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, @@ -343,7 +343,7 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, plugin_obj = manager.NeutronManager.get_plugin() orig = plugin_obj.create_port with mock.patch.object(plugin_obj, - 'create_port') as patched_plugin: + '_create_port_db') as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, @@ -718,7 +718,7 @@ class TestCiscoNetworksV2(CiscoML2MechanismTestCase, with mock.patch('__builtin__.hasattr', new=fakehasattr): with mock.patch.object(plugin_obj, - 'create_network') as patched_plugin: + '_create_network_db') as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, *args, **kwargs) @@ -737,7 +737,7 @@ class TestCiscoNetworksV2(CiscoML2MechanismTestCase, plugin_obj = manager.NeutronManager.get_plugin() orig = plugin_obj.create_network with mock.patch.object(plugin_obj, - 'create_network') as patched_plugin: + '_create_network_db') as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, @@ -769,7 +769,7 @@ class TestCiscoSubnetsV2(CiscoML2MechanismTestCase, plugin_obj = manager.NeutronManager.get_plugin() orig = plugin_obj.create_subnet with mock.patch.object(plugin_obj, - 'create_subnet') as patched_plugin: + '_create_subnet_db') as patched_plugin: def side_effect(*args, **kwargs): self._fail_second_call(patched_plugin, orig, @@ -792,7 +792,7 @@ class TestCiscoSubnetsV2(CiscoML2MechanismTestCase, plugin_obj = manager.NeutronManager.get_plugin() orig = plugin_obj.create_subnet with mock.patch.object(plugin_obj, - 'create_subnet') as patched_plugin: + '_create_subnet_db') as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, *args, **kwargs) diff --git a/neutron/tests/unit/ml2/drivers/mechanism_bulkless.py b/neutron/tests/unit/ml2/drivers/mechanism_bulkless.py deleted file mode 100644 index 0a0d3de93..000000000 --- a/neutron/tests/unit/ml2/drivers/mechanism_bulkless.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2014 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.plugins.ml2 import driver_api as api - - -class BulklessMechanismDriver(api.MechanismDriver): - """Test mechanism driver for testing bulk emulation.""" - - def initialize(self): - self.native_bulk_support = False diff --git a/neutron/tests/unit/ml2/test_ml2_plugin.py b/neutron/tests/unit/ml2/test_ml2_plugin.py index 4bf311598..37495dc31 100644 --- a/neutron/tests/unit/ml2/test_ml2_plugin.py +++ b/neutron/tests/unit/ml2/test_ml2_plugin.py @@ -90,14 +90,6 @@ class Ml2PluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): self.context = context.get_admin_context() -class TestMl2BulkToggleWithBulkless(Ml2PluginV2TestCase): - - _mechanism_drivers = ['logger', 'test', 'bulkless'] - - def test_bulk_disable_with_bulkless_driver(self): - self.assertTrue(self._skip_native_bulk) - - class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase): _mechanism_drivers = ['logger', 'test'] @@ -166,6 +158,23 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): flips = l3plugin.get_floatingips(context.get_admin_context()) self.assertFalse(flips) + def test_create_ports_bulk_port_binding_failure(self): + ctx = context.get_admin_context() + with self.network() as net: + plugin = manager.NeutronManager.get_plugin() + + with mock.patch.object(plugin, '_bind_port_if_needed', + side_effect=ml2_exc.MechanismDriverError( + method='create_port_bulk')) as _bind_port_if_needed: + + res = self._create_port_bulk(self.fmt, 2, net['network']['id'], + 'test', True, context=ctx) + + self.assertTrue(_bind_port_if_needed.called) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure( + res, 'ports', webob.exc.HTTPServerError.code) + def test_delete_port_no_notify_in_disassociate_floatingips(self): ctx = context.get_admin_context() plugin = manager.NeutronManager.get_plugin() diff --git a/neutron/tests/unit/test_db_plugin.py b/neutron/tests/unit/test_db_plugin.py index 6d2dd8c75..278de5553 100644 --- a/neutron/tests/unit/test_db_plugin.py +++ b/neutron/tests/unit/test_db_plugin.py @@ -63,6 +63,17 @@ def _fake_get_sorting_helper(self, request): return api_common.SortingEmulatedHelper(request, self._attr_info) +# TODO(banix): Move the following method to ML2 db test module when ML2 +# mechanism driver unit tests are corrected to use Ml2PluginV2TestCase +# instead of directly using NeutronDbPluginV2TestCase +def _get_create_db_method(resource): + ml2_method = '_create_%s_db' % resource + if hasattr(manager.NeutronManager.get_plugin(), ml2_method): + return ml2_method + else: + return 'create_%s' % resource + + class NeutronDbPluginV2TestCase(testlib_api.WebTestCase, testlib_plugin.PluginSetupHelper): fmt = 'json' @@ -883,8 +894,9 @@ class TestPortsV2(NeutronDbPluginV2TestCase): with mock.patch('__builtin__.hasattr', new=fakehasattr): orig = manager.NeutronManager.get_plugin().create_port + method_to_patch = _get_create_db_method('port') with mock.patch.object(manager.NeutronManager.get_plugin(), - 'create_port') as patched_plugin: + method_to_patch) as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, @@ -908,7 +920,8 @@ class TestPortsV2(NeutronDbPluginV2TestCase): with self.network() as net: plugin = manager.NeutronManager.get_plugin() orig = plugin.create_port - with mock.patch.object(plugin, 'create_port') as patched_plugin: + method_to_patch = _get_create_db_method('port') + with mock.patch.object(plugin, method_to_patch) as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, @@ -2073,8 +2086,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): #ensures the API choose the emulation code path with mock.patch('__builtin__.hasattr', new=fakehasattr): + method_to_patch = _get_create_db_method('network') with mock.patch.object(manager.NeutronManager.get_plugin(), - 'create_network') as patched_plugin: + method_to_patch) as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, @@ -2091,8 +2105,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk network create") orig = manager.NeutronManager.get_plugin().create_network + method_to_patch = _get_create_db_method('network') with mock.patch.object(manager.NeutronManager.get_plugin(), - 'create_network') as patched_plugin: + method_to_patch) as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, @@ -2513,8 +2528,9 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): with mock.patch('__builtin__.hasattr', new=fakehasattr): orig = manager.NeutronManager.get_plugin().create_subnet + method_to_patch = _get_create_db_method('subnet') with mock.patch.object(manager.NeutronManager.get_plugin(), - 'create_subnet') as patched_plugin: + method_to_patch) as patched_plugin: def side_effect(*args, **kwargs): self._fail_second_call(patched_plugin, orig, @@ -2536,7 +2552,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): self.skipTest("Plugin does not support native bulk subnet create") plugin = manager.NeutronManager.get_plugin() orig = plugin.create_subnet - with mock.patch.object(plugin, 'create_subnet') as patched_plugin: + method_to_patch = _get_create_db_method('subnet') + with mock.patch.object(plugin, method_to_patch) as patched_plugin: def side_effect(*args, **kwargs): return self._fail_second_call(patched_plugin, orig, *args, **kwargs) diff --git a/setup.cfg b/setup.cfg index dc5b1a450..75cf0fde4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -173,7 +173,6 @@ neutron.ml2.mechanism_drivers = opendaylight = neutron.plugins.ml2.drivers.mechanism_odl:OpenDaylightMechanismDriver logger = neutron.tests.unit.ml2.drivers.mechanism_logger:LoggerMechanismDriver test = neutron.tests.unit.ml2.drivers.mechanism_test:TestMechanismDriver - bulkless = neutron.tests.unit.ml2.drivers.mechanism_bulkless:BulklessMechanismDriver linuxbridge = neutron.plugins.ml2.drivers.mech_linuxbridge:LinuxbridgeMechanismDriver openvswitch = neutron.plugins.ml2.drivers.mech_openvswitch:OpenvswitchMechanismDriver hyperv = neutron.plugins.ml2.drivers.mech_hyperv:HypervMechanismDriver