# register plugin config opts
pl_config.register_config()
self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
- # backend doesn't support bulk operations yet
- self.native_bulk_support = False
# init network ctrl connections
self.servers = servermanager.ServerPool()
[driver.name for driver in self.ordered_mech_drivers])
def initialize(self):
- # For ML2 to support bulk operations, each driver must support them
- self.native_bulk_support = True
for driver in self.ordered_mech_drivers:
LOG.info(_LI("Initializing mechanism driver '%s'"), driver.name)
driver.obj.initialize()
- self.native_bulk_support &= getattr(driver.obj,
- 'native_bulk_support', True)
def _call_on_drivers(self, method_name, context,
continue_on_failure=False):
self.type_manager.initialize()
self.extension_manager.initialize()
self.mechanism_manager.initialize()
- # bulk support depends on the underlying drivers
- self.__native_bulk_support = self.mechanism_manager.native_bulk_support
self._setup_rpc()
segment[api.SEGMENTATION_ID],
segment[api.PHYSICAL_NETWORK])
- # TODO(apech): Need to override bulk operations
+ def _delete_objects(self, context, resource, objects):
+ delete_op = getattr(self, 'delete_%s' % resource)
+ for obj in objects:
+ try:
+ delete_op(context, obj['result']['id'])
+ except KeyError:
+ LOG.exception(_LE("Could not find %s to delete."),
+ resource)
+ except Exception:
+ LOG.exception(_LE("Could not delete %(res)s %(id)s."),
+ {'res': resource,
+ 'id': obj['result']['id']})
+
+ def _create_bulk_ml2(self, resource, context, request_items):
+ objects = []
+ collection = "%ss" % resource
+ items = request_items[collection]
+ try:
+ with context.session.begin(subtransactions=True):
+ obj_creator = getattr(self, '_create_%s_db' % resource)
+ for item in items:
+ attrs = item[resource]
+ result, mech_context = obj_creator(context, item)
+ objects.append({'mech_context': mech_context,
+ 'result': result,
+ 'attributes': attrs})
+
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_LE("An exception occurred while creating "
+ "the %(resource)s:%(item)s"),
+ {'resource': resource, 'item': item})
- def create_network(self, context, network):
- net_data = network['network']
+ try:
+ postcommit_op = getattr(self.mechanism_manager,
+ 'create_%s_postcommit' % resource)
+ for obj in objects:
+ postcommit_op(obj['mech_context'])
+ return objects
+ except ml2_exc.MechanismDriverError:
+ with excutils.save_and_reraise_exception():
+ resource_ids = [res['result']['id'] for res in objects]
+ LOG.exception(_LE("mechanism_manager.create_%(res)s"
+ "_postcommit failed for %(res)s: "
+ "'%(failed_id)s'. Deleting "
+ "%(res)ss %(resource_ids)s"),
+ {'res': resource,
+ 'failed_id': obj['result']['id'],
+ 'resource_ids': ', '.join(resource_ids)})
+ self._delete_objects(context, resource, objects)
+
+ def _create_network_db(self, context, network):
+ net_data = network[attributes.NETWORK]
tenant_id = self._get_tenant_id_for_create(context, net_data)
session = context.session
with session.begin(subtransactions=True):
mech_context = driver_context.NetworkContext(self, context,
result)
self.mechanism_manager.create_network_precommit(mech_context)
+ return result, mech_context
+ def create_network(self, context, network):
+ result, mech_context = self._create_network_db(context, network)
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
LOG.error(_LE("mechanism_manager.create_network_postcommit "
"failed, deleting network '%s'"), result['id'])
self.delete_network(context, result['id'])
+
return result
+ def create_network_bulk(self, context, networks):
+ objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
+ return [obj['result'] for obj in objects]
+
def update_network(self, context, id, network):
provider._raise_if_updates_provider_attributes(network['network'])
" failed"))
self.notifier.network_delete(context, id)
- def create_subnet(self, context, subnet):
+ def _create_subnet_db(self, context, subnet):
session = context.session
with session.begin(subtransactions=True):
result = super(Ml2Plugin, self).create_subnet(context, subnet)
mech_context = driver_context.SubnetContext(self, context, result)
self.mechanism_manager.create_subnet_precommit(mech_context)
+ return result, mech_context
+
+ def create_subnet(self, context, subnet):
+ result, mech_context = self._create_subnet_db(context, subnet)
try:
self.mechanism_manager.create_subnet_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
self.delete_subnet(context, result['id'])
return result
+ def create_subnet_bulk(self, context, subnets):
+ objects = self._create_bulk_ml2(attributes.SUBNET, context, subnets)
+ return [obj['result'] for obj in objects]
+
def update_subnet(self, context, id, subnet):
session = context.session
with session.begin(subtransactions=True):
# the fact that an error occurred.
LOG.error(_LE("mechanism_manager.delete_subnet_postcommit failed"))
- def create_port(self, context, port):
- attrs = port['port']
+ def _create_port_db(self, context, port):
+ attrs = port[attributes.PORT]
attrs['status'] = const.PORT_STATUS_DOWN
session = context.session
binding = db.add_port_binding(session, result['id'])
mech_context = driver_context.PortContext(self, context, result,
network, binding)
- new_host_port = self._get_host_port_if_changed(mech_context, attrs)
+
self._process_port_binding(mech_context, attrs)
result[addr_pair.ADDRESS_PAIRS] = (
dhcp_opts)
self.mechanism_manager.create_port_precommit(mech_context)
- # Notification must be sent after the above transaction is complete
+ return result, mech_context
+
+ def create_port(self, context, port):
+ attrs = port['port']
+ result, mech_context = self._create_port_db(context, port)
+ new_host_port = self._get_host_port_if_changed(mech_context, attrs)
self._notify_l3_agent_new_port(context, new_host_port)
try:
self.delete_port(context, result['id'])
return bound_context._port
+ def create_port_bulk(self, context, ports):
+ objects = self._create_bulk_ml2(attributes.PORT, context, ports)
+
+ for obj in objects:
+ # REVISIT(rkukura): Is there any point in calling this before
+ # a binding has been successfully established?
+ # TODO(banix): Use a single notification for all objects
+ self.notify_security_groups_member_updated(context,
+ obj['result'])
+
+ attrs = obj['attributes']
+ if attrs and attrs.get(portbindings.HOST_ID):
+ new_host_port = self._get_host_port_if_changed(
+ obj['mech_context'], attrs)
+ self._notify_l3_agent_new_port(context, new_host_port)
+
+ try:
+ for obj in objects:
+ obj['bound_context'] = self._bind_port_if_needed(
+ obj['mech_context'])
+ return [obj['bound_context']._port for obj in objects]
+ except ml2_exc.MechanismDriverError:
+ with excutils.save_and_reraise_exception():
+ resource_ids = [res['result']['id'] for res in objects]
+ LOG.error(_LE("_bind_port_if_needed failed. "
+ "Deleting all ports from create bulk '%s'"),
+ resource_ids)
+ self._delete_objects(context, 'port', objects)
+
def update_port(self, context, id, port):
attrs = port['port']
need_port_update_notify = False
plugin_obj = manager.NeutronManager.get_plugin()
orig = plugin_obj.create_port
with mock.patch.object(plugin_obj,
- 'create_port') as patched_plugin:
+ '_create_port_db') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
plugin_obj = manager.NeutronManager.get_plugin()
orig = plugin_obj.create_port
with mock.patch.object(plugin_obj,
- 'create_port') as patched_plugin:
+ '_create_port_db') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with mock.patch.object(plugin_obj,
- 'create_network') as patched_plugin:
+ '_create_network_db') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
plugin_obj = manager.NeutronManager.get_plugin()
orig = plugin_obj.create_network
with mock.patch.object(plugin_obj,
- 'create_network') as patched_plugin:
+ '_create_network_db') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
plugin_obj = manager.NeutronManager.get_plugin()
orig = plugin_obj.create_subnet
with mock.patch.object(plugin_obj,
- 'create_subnet') as patched_plugin:
+ '_create_subnet_db') as patched_plugin:
def side_effect(*args, **kwargs):
self._fail_second_call(patched_plugin, orig,
plugin_obj = manager.NeutronManager.get_plugin()
orig = plugin_obj.create_subnet
with mock.patch.object(plugin_obj,
- 'create_subnet') as patched_plugin:
+ '_create_subnet_db') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
+++ /dev/null
-# Copyright (c) 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron.plugins.ml2 import driver_api as api
-
-
-class BulklessMechanismDriver(api.MechanismDriver):
- """Test mechanism driver for testing bulk emulation."""
-
- def initialize(self):
- self.native_bulk_support = False
self.context = context.get_admin_context()
-class TestMl2BulkToggleWithBulkless(Ml2PluginV2TestCase):
-
- _mechanism_drivers = ['logger', 'test', 'bulkless']
-
- def test_bulk_disable_with_bulkless_driver(self):
- self.assertTrue(self._skip_native_bulk)
-
-
class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
_mechanism_drivers = ['logger', 'test']
flips = l3plugin.get_floatingips(context.get_admin_context())
self.assertFalse(flips)
+ def test_create_ports_bulk_port_binding_failure(self):
+ ctx = context.get_admin_context()
+ with self.network() as net:
+ plugin = manager.NeutronManager.get_plugin()
+
+ with mock.patch.object(plugin, '_bind_port_if_needed',
+ side_effect=ml2_exc.MechanismDriverError(
+ method='create_port_bulk')) as _bind_port_if_needed:
+
+ res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
+ 'test', True, context=ctx)
+
+ self.assertTrue(_bind_port_if_needed.called)
+ # We expect a 500 as we injected a fault in the plugin
+ self._validate_behavior_on_bulk_failure(
+ res, 'ports', webob.exc.HTTPServerError.code)
+
def test_delete_port_no_notify_in_disassociate_floatingips(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
return api_common.SortingEmulatedHelper(request, self._attr_info)
+# TODO(banix): Move the following method to ML2 db test module when ML2
+# mechanism driver unit tests are corrected to use Ml2PluginV2TestCase
+# instead of directly using NeutronDbPluginV2TestCase
+def _get_create_db_method(resource):
+ ml2_method = '_create_%s_db' % resource
+ if hasattr(manager.NeutronManager.get_plugin(), ml2_method):
+ return ml2_method
+ else:
+ return 'create_%s' % resource
+
+
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase,
testlib_plugin.PluginSetupHelper):
fmt = 'json'
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = manager.NeutronManager.get_plugin().create_port
+ method_to_patch = _get_create_db_method('port')
with mock.patch.object(manager.NeutronManager.get_plugin(),
- 'create_port') as patched_plugin:
+ method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
with self.network() as net:
plugin = manager.NeutronManager.get_plugin()
orig = plugin.create_port
- with mock.patch.object(plugin, 'create_port') as patched_plugin:
+ method_to_patch = _get_create_db_method('port')
+ with mock.patch.object(plugin, method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
#ensures the API choose the emulation code path
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
+ method_to_patch = _get_create_db_method('network')
with mock.patch.object(manager.NeutronManager.get_plugin(),
- 'create_network') as patched_plugin:
+ method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
orig = manager.NeutronManager.get_plugin().create_network
+ method_to_patch = _get_create_db_method('network')
with mock.patch.object(manager.NeutronManager.get_plugin(),
- 'create_network') as patched_plugin:
+ method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = manager.NeutronManager.get_plugin().create_subnet
+ method_to_patch = _get_create_db_method('subnet')
with mock.patch.object(manager.NeutronManager.get_plugin(),
- 'create_subnet') as patched_plugin:
+ method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
self._fail_second_call(patched_plugin, orig,
self.skipTest("Plugin does not support native bulk subnet create")
plugin = manager.NeutronManager.get_plugin()
orig = plugin.create_subnet
- with mock.patch.object(plugin, 'create_subnet') as patched_plugin:
+ method_to_patch = _get_create_db_method('subnet')
+ with mock.patch.object(plugin, method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
opendaylight = neutron.plugins.ml2.drivers.mechanism_odl:OpenDaylightMechanismDriver
logger = neutron.tests.unit.ml2.drivers.mechanism_logger:LoggerMechanismDriver
test = neutron.tests.unit.ml2.drivers.mechanism_test:TestMechanismDriver
- bulkless = neutron.tests.unit.ml2.drivers.mechanism_bulkless:BulklessMechanismDriver
linuxbridge = neutron.plugins.ml2.drivers.mech_linuxbridge:LinuxbridgeMechanismDriver
openvswitch = neutron.plugins.ml2.drivers.mech_openvswitch:OpenvswitchMechanismDriver
hyperv = neutron.plugins.ml2.drivers.mech_hyperv:HypervMechanismDriver