From: Salvatore Orlando Date: Mon, 13 Aug 2012 16:31:38 +0000 (-0700) Subject: Fix bulk create operations and make them atomic. X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=ff837025e955799e69eebcab665374436649d6f8;p=openstack-build%2Fneutron-build.git Fix bulk create operations and make them atomic. Bug 1024844 Bug 1020639 The API layer is now able to issue bulk create requests to the plugin, assuming that the plugin supports them. Otherwise, the API layer will emulate atomic behavior. This patch also implements OVS plugin support for bulk requests. Change-Id: I515148d870d0dff8371862fe577c477538364929 --- diff --git a/etc/quantum.conf b/etc/quantum.conf index 97584d271..ad8e18de2 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -39,6 +39,8 @@ api_paste_config = api-paste.ini # Maximum amount of retries to generate a unique MAC address # mac_generation_retries = 16 +# Enable or disable bulk create/update/delete operations +# allow_bulk = True # RPC configuration options. Defined in rpc __init__ # The messaging module to use, defaults to kombu. # rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu diff --git a/quantum/api/v2/base.py b/quantum/api/v2/base.py index 5a013bb68..e499fa906 100644 --- a/quantum/api/v2/base.py +++ b/quantum/api/v2/base.py @@ -117,15 +117,23 @@ def verbose(request): class Controller(object): - def __init__(self, plugin, collection, resource, attr_info): + def __init__(self, plugin, collection, resource, + attr_info, allow_bulk=False): self._plugin = plugin self._collection = collection self._resource = resource self._attr_info = attr_info + self._allow_bulk = allow_bulk + self._native_bulk = self._is_native_bulk_supported() self._policy_attrs = [name for (name, info) in self._attr_info.items() if info.get('required_by_policy')] self._publisher_id = notifier_api.publisher_id('network') + def _is_native_bulk_supported(self): + native_bulk_attr_name = ("_%s__native_bulk_support" + % self._plugin.__class__.__name__) + return getattr(self._plugin, native_bulk_attr_name, False) + def _is_visible(self, attr): attr_val = self._attr_info.get(attr) return attr_val and attr_val['is_visible'] @@ -209,6 +217,32 @@ class Controller(object): # doesn't exist raise webob.exc.HTTPNotFound() + def _emulate_bulk_create(self, obj_creator, request, body): + objs = [] + try: + for item in body[self._collection]: + kwargs = {self._resource: item} + objs.append(self._view(obj_creator(request.context, + **kwargs))) + return objs + # Note(salvatore-orlando): broad catch as in theory a plugin + # could raise any kind of exception + except Exception as ex: + for obj in objs: + delete_action = "delete_%s" % self._resource + obj_deleter = getattr(self._plugin, delete_action) + try: + obj_deleter(request.context, obj['id']) + except Exception: + # broad catch as our only purpose is to log the exception + LOG.exception("Unable to undo add for %s %s", + self._resource, obj['id']) + # TODO(salvatore-orlando): The object being processed when the + # plugin raised might have been created or not in the db. + # We need a way for ensuring that if it has been created, + # it is then deleted + raise + def create(self, request, body=None): """Creates a new instance of the requested entity""" notifier_api.notify(request.context, @@ -216,10 +250,8 @@ class Controller(object): self._resource + '.create.start', notifier_api.INFO, body) - body = self._prepare_request_body(request.context, body, True, - allow_bulk=True) + body = self._prepare_request_body(request.context, body, True) action = "create_%s" % self._resource - # Check authz try: if self._collection in body: @@ -256,16 +288,30 @@ class Controller(object): LOG.exception("Create operation not authorized") raise webob.exc.HTTPForbidden() - obj_creator = getattr(self._plugin, action) - kwargs = {self._resource: body} - obj = obj_creator(request.context, **kwargs) - result = {self._resource: self._view(obj)} - notifier_api.notify(request.context, - self._publisher_id, - self._resource + '.create.end', - notifier_api.INFO, - result) - return result + def notify(create_result): + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.create.end', + notifier_api.INFO, + create_result) + return create_result + + if self._collection in body and self._native_bulk: + # plugin does atomic bulk create operations + obj_creator = getattr(self._plugin, "%s_bulk" % action) + objs = obj_creator(request.context, body) + return notify({self._collection: [self._view(obj) + for obj in objs]}) + else: + obj_creator = getattr(self._plugin, action) + if self._collection in body: + # Emulate atomic bulk behavior + objs = self._emulate_bulk_create(obj_creator, request, body) + return notify({self._collection: objs}) + else: + kwargs = {self._resource: body} + obj = obj_creator(request.context, **kwargs) + return notify({self._resource: self._view(obj)}) def delete(self, request, id): """Deletes the specified entity""" @@ -355,8 +401,7 @@ class Controller(object): " that tenant_id is specified") raise webob.exc.HTTPBadRequest(msg) - def _prepare_request_body(self, context, body, is_create, - allow_bulk=False): + def _prepare_request_body(self, context, body, is_create): """ verifies required attributes are in request body, and that an attribute is only specified if it is allowed for the given operation (create/update). @@ -369,7 +414,7 @@ class Controller(object): raise webob.exc.HTTPBadRequest(_("Resource body required")) body = body or {self._resource: {}} - if self._collection in body and allow_bulk: + if self._collection in body and self._allow_bulk: bulk_body = [self._prepare_request_body(context, {self._resource: b}, is_create) @@ -382,7 +427,7 @@ class Controller(object): return {self._collection: bulk_body} - elif self._collection in body and not allow_bulk: + elif self._collection in body and not self._allow_bulk: raise webob.exc.HTTPBadRequest("Bulk operation not supported") res_dict = body.get(self._resource) @@ -459,8 +504,8 @@ class Controller(object): }) -def create_resource(collection, resource, plugin, params): - controller = Controller(plugin, collection, resource, params) +def create_resource(collection, resource, plugin, params, allow_bulk=False): + controller = Controller(plugin, collection, resource, params, allow_bulk) # NOTE(jkoelker) To anyone wishing to add "proper" xml support # this is where you do it diff --git a/quantum/api/v2/router.py b/quantum/api/v2/router.py index af07e3ec2..dcd719373 100644 --- a/quantum/api/v2/router.py +++ b/quantum/api/v2/router.py @@ -69,7 +69,6 @@ class APIRouter(wsgi.Router): def __init__(self, **local_config): mapper = routes_mapper.Mapper() plugin = manager.QuantumManager.get_plugin() - ext_mgr = extensions.PluginAwareExtensionManager.get_instance() ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP) @@ -81,8 +80,10 @@ class APIRouter(wsgi.Router): 'port': 'ports'} def _map_resource(collection, resource, params): + allow_bulk = cfg.CONF.allow_bulk controller = base.create_resource(collection, resource, - plugin, params) + plugin, params, + allow_bulk=allow_bulk) mapper_kwargs = dict(controller=controller, requirements=REQUIREMENTS, **col_kwargs) diff --git a/quantum/common/config.py b/quantum/common/config.py index eb84e9eb4..6b463f3de 100644 --- a/quantum/common/config.py +++ b/quantum/common/config.py @@ -43,7 +43,8 @@ core_opts = [ cfg.StrOpt('core_plugin', default='quantum.plugins.sample.SamplePlugin.FakePlugin'), cfg.StrOpt('base_mac', default="fa:16:3e:00:00:00"), - cfg.IntOpt('mac_generation_retries', default=16) + cfg.IntOpt('mac_generation_retries', default=16), + cfg.BoolOpt('allow_bulk', default=True), ] # Register the configuration options diff --git a/quantum/db/db_base_plugin_v2.py b/quantum/db/db_base_plugin_v2.py index e50d2e4c7..eb3f997a7 100644 --- a/quantum/db/db_base_plugin_v2.py +++ b/quantum/db/db_base_plugin_v2.py @@ -41,6 +41,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): certain events. """ + # This attribute specifies whether the plugin supports or not + # bulk operations. Name mangling is used in order to ensure it + # is qualified by class + __native_bulk_support = True + def __init__(self): # NOTE(jkoelker) This is an incomlete implementation. Subclasses # must override __init__ and setup the database @@ -673,12 +678,34 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): "device_id": port["device_id"]} return self._fields(res, fields) + def _create_bulk(self, resource, context, request_items): + objects = [] + collection = "%ss" % resource + items = request_items[collection] + context.session.begin(subtransactions=True) + try: + for item in items: + obj_creator = getattr(self, 'create_%s' % resource) + objects.append(obj_creator(context, item)) + context.session.commit() + except Exception: + LOG.exception("An exception occured while creating " + "the port:%s", item) + context.session.rollback() + raise + return objects + + def create_network_bulk(self, context, networks): + return self._create_bulk('network', context, networks) + def create_network(self, context, network): + """ handle creation of a single network """ + # single request processing n = network['network'] # NOTE(jkoelker) Get the tenant_id outside of the session to avoid # unneeded db action if the operation raises tenant_id = self._get_tenant_id_for_create(context, n) - with context.session.begin(): + with context.session.begin(subtransactions=True): network = models_v2.Network(tenant_id=tenant_id, id=n.get('id') or utils.str_uuid(), name=n['name'], @@ -721,6 +748,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): filters=filters, fields=fields, verbose=verbose) + def create_subnet_bulk(self, context, subnets): + return self._create_bulk('subnet', context, subnets) + def create_subnet(self, context, subnet): s = subnet['subnet'] net = netaddr.IPNetwork(s['cidr']) @@ -728,7 +758,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1)) tenant_id = self._get_tenant_id_for_create(context, s) - with context.session.begin(): + with context.session.begin(subtransactions=True): network = self._get_network(context, s["network_id"]) self._validate_subnet_cidr(network, s['cidr']) subnet = models_v2.Subnet(tenant_id=tenant_id, @@ -780,13 +810,16 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): filters=filters, fields=fields, verbose=verbose) + def create_port_bulk(self, context, ports): + return self._create_bulk('port', context, ports) + def create_port(self, context, port): p = port['port'] # NOTE(jkoelker) Get the tenant_id outside of the session to avoid # unneeded db action if the operation raises tenant_id = self._get_tenant_id_for_create(context, p) - with context.session.begin(): + with context.session.begin(subtransactions=True): network = self._get_network(context, p["network_id"]) # Ensure that a MAC address is defined and it is unique on the @@ -817,7 +850,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): # Update the allocated IP's if ips: - with context.session.begin(): + with context.session.begin(subtransactions=True): for ip in ips: LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'], port['network_id'], ip['subnet_id'], port.id) diff --git a/quantum/plugins/linuxbridge/lb_quantum_plugin.py b/quantum/plugins/linuxbridge/lb_quantum_plugin.py index 1b0507c8d..80f88b9d4 100644 --- a/quantum/plugins/linuxbridge/lb_quantum_plugin.py +++ b/quantum/plugins/linuxbridge/lb_quantum_plugin.py @@ -196,7 +196,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2): super(LinuxBridgePluginV2, self).delete_network(context, net['id']) raise - return net def update_network(self, context, id, network): diff --git a/quantum/plugins/openvswitch/ovs_db_v2.py b/quantum/plugins/openvswitch/ovs_db_v2.py index 7298e439d..d7c29c1bb 100644 --- a/quantum/plugins/openvswitch/ovs_db_v2.py +++ b/quantum/plugins/openvswitch/ovs_db_v2.py @@ -40,8 +40,8 @@ def get_vlans(): return [(binding.vlan_id, binding.network_id) for binding in bindings] -def get_vlan(net_id): - session = db.get_session() +def get_vlan(net_id, session=None): + session = session or db.get_session() try: binding = (session.query(ovs_models_v2.VlanBinding). filter_by(network_id=net_id). @@ -51,11 +51,10 @@ def get_vlan(net_id): return binding.vlan_id -def add_vlan_binding(vlan_id, net_id): - session = db.get_session() - binding = ovs_models_v2.VlanBinding(vlan_id, net_id) - session.add(binding) - session.flush() +def add_vlan_binding(vlan_id, net_id, session): + with session.begin(subtransactions=True): + binding = ovs_models_v2.VlanBinding(vlan_id, net_id) + session.add(binding) return binding @@ -114,10 +113,9 @@ def get_vlan_id(vlan_id): return None -def reserve_vlan_id(): +def reserve_vlan_id(session): """Reserve an unused vlan_id""" - session = db.get_session() with session.begin(subtransactions=True): record = (session.query(ovs_models_v2.VlanID). filter_by(vlan_used=False). @@ -129,14 +127,13 @@ def reserve_vlan_id(): return record.vlan_id -def reserve_specific_vlan_id(vlan_id): +def reserve_specific_vlan_id(vlan_id, session): """Reserve a specific vlan_id""" if vlan_id < 1 or vlan_id > 4094: msg = _("Specified VLAN %s outside legal range (1-4094)") % vlan_id raise q_exc.InvalidInput(error_message=msg) - session = db.get_session() with session.begin(subtransactions=True): try: record = (session.query(ovs_models_v2.VlanID). diff --git a/quantum/plugins/openvswitch/ovs_quantum_plugin.py b/quantum/plugins/openvswitch/ovs_quantum_plugin.py index 068f4adc4..6792cd54f 100644 --- a/quantum/plugins/openvswitch/ovs_quantum_plugin.py +++ b/quantum/plugins/openvswitch/ovs_quantum_plugin.py @@ -177,6 +177,10 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): be updated to take advantage of it. """ + # This attribute specifies whether the plugin supports or not + # bulk operations. Name mangling is used in order to ensure it + # is qualified by class + __native_bulk_support = True supported_extension_aliases = ["provider"] def __init__(self, configfile=None): @@ -227,7 +231,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): def _extend_network_dict(self, context, network): if self._check_provider_view_auth(context, network): if not self.enable_tunneling: - network['provider:vlan_id'] = ovs_db_v2.get_vlan(network['id']) + network['provider:vlan_id'] = ovs_db_v2.get_vlan( + network['id'], context.session) def create_network(self, context, network): net = super(OVSQuantumPluginV2, self).create_network(context, network) @@ -235,15 +240,15 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): vlan_id = network['network'].get('provider:vlan_id') if vlan_id not in (None, attributes.ATTR_NOT_SPECIFIED): self._enforce_provider_set_auth(context, net) - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, context.session) else: - vlan_id = ovs_db_v2.reserve_vlan_id() + vlan_id = ovs_db_v2.reserve_vlan_id(context.session) except Exception: super(OVSQuantumPluginV2, self).delete_network(context, net['id']) raise LOG.debug("Created network: %s" % net['id']) - ovs_db_v2.add_vlan_binding(vlan_id, str(net['id'])) + ovs_db_v2.add_vlan_binding(vlan_id, str(net['id']), context.session) self._extend_network_dict(context, net) return net diff --git a/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py b/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py index 707bb5250..8f82ea434 100644 --- a/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py +++ b/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py @@ -63,46 +63,50 @@ class OVSVlanIdsTest(unittest2.TestCase): self.assertIsNone(ovs_db_v2.get_vlan_id(VLAN_MAX + 5 + 1)) def test_vlan_id_pool(self): + session = db.get_session() vlan_ids = set() for x in xrange(VLAN_MIN, VLAN_MAX + 1): - vlan_id = ovs_db_v2.reserve_vlan_id() + vlan_id = ovs_db_v2.reserve_vlan_id(db.get_session()) self.assertGreaterEqual(vlan_id, VLAN_MIN) self.assertLessEqual(vlan_id, VLAN_MAX) vlan_ids.add(vlan_id) with self.assertRaises(q_exc.NoNetworkAvailable): - vlan_id = ovs_db_v2.reserve_vlan_id() + vlan_id = ovs_db_v2.reserve_vlan_id(session) for vlan_id in vlan_ids: ovs_db_v2.release_vlan_id(vlan_id) def test_invalid_specific_vlan_id(self): + session = db.get_session() with self.assertRaises(q_exc.InvalidInput): - vlan_id = ovs_db_v2.reserve_specific_vlan_id(0) + vlan_id = ovs_db_v2.reserve_specific_vlan_id(0, session) with self.assertRaises(q_exc.InvalidInput): - vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095) + vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095, session) def test_specific_vlan_id_inside_pool(self): + session = db.get_session() vlan_id = VLAN_MIN + 5 self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) with self.assertRaises(q_exc.VlanIdInUse): - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) ovs_db_v2.release_vlan_id(vlan_id) self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) def test_specific_vlan_id_outside_pool(self): + session = db.get_session() vlan_id = VLAN_MAX + 5 self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id)) - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) with self.assertRaises(q_exc.VlanIdInUse): - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) ovs_db_v2.release_vlan_id(vlan_id) self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id)) diff --git a/quantum/tests/unit/test_api_v2.py b/quantum/tests/unit/test_api_v2.py index f46eaadba..0c7360b8c 100644 --- a/quantum/tests/unit/test_api_v2.py +++ b/quantum/tests/unit/test_api_v2.py @@ -551,16 +551,17 @@ class JSONV2TestCase(APIv2TestBase): self.assertEqual(res.status_int, 422) def test_create_bulk(self): - data = {'networks': [{'name': 'net1', 'admin_state_up': True, + data = {'networks': [{'name': 'net1', + 'admin_state_up': True, 'tenant_id': _uuid()}, - {'name': 'net2', 'admin_state_up': True, + {'name': 'net2', + 'admin_state_up': True, 'tenant_id': _uuid()}]} def side_effect(context, network): - nets = network.copy() - for net in nets['networks']: - net.update({'subnets': []}) - return nets + net = network.copy() + net['network'].update({'subnets': []}) + return net['network'] instance = self.plugin.return_value instance.create_network.side_effect = side_effect @@ -904,7 +905,6 @@ class ExtensionTestCase(unittest.TestCase): self.api = None self.plugin = None cfg.CONF.reset() - # Restore the global RESOURCE_ATTRIBUTE_MAP attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map diff --git a/quantum/tests/unit/test_db_plugin.py b/quantum/tests/unit/test_db_plugin.py index 7dd902f31..9c37e4c8d 100644 --- a/quantum/tests/unit/test_db_plugin.py +++ b/quantum/tests/unit/test_db_plugin.py @@ -14,6 +14,7 @@ # limitations under the License. import contextlib +import copy import logging import mock import os @@ -36,6 +37,7 @@ from quantum.wsgi import Serializer, JSONDeserializer LOG = logging.getLogger(__name__) +DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2' ROOTDIR = os.path.dirname(os.path.dirname(__file__)) ETCDIR = os.path.join(ROOTDIR, 'etc') @@ -62,10 +64,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): 'application/json': json_deserializer, } - plugin = test_config.get('plugin_name_v2', - 'quantum.db.db_base_plugin_v2.' - 'QuantumDbPluginV2') - LOG.debug("db plugin test, the plugin is:%s", plugin) + plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS) # Create the default configurations args = ['--config-file', etcdir('quantum.conf.test')] config.parse(args=args) @@ -74,6 +73,14 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab") self.api = APIRouter() + def _is_native_bulk_supported(): + plugin_obj = QuantumManager.get_plugin() + native_bulk_attr_name = ("_%s__native_bulk_support" + % plugin_obj.__class__.__name__) + return getattr(plugin_obj, native_bulk_attr_name, False) + + self._skip_native_bulk = not _is_native_bulk_supported() + def tearDown(self): super(QuantumDbPluginV2TestCase, self).tearDown() # NOTE(jkoelker) for a 'pluggable' framework, Quantum sure @@ -118,6 +125,28 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): data = self._deserializers[ctype].deserialize(response.body)['body'] return data + def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs): + """ Creates a bulk request for any kind of resource """ + objects = [] + collection = "%ss" % resource + for i in range(0, number): + obj = copy.deepcopy(data) + obj[resource]['name'] = "%s_%s" % (name, i) + if 'override' in kwargs and i in kwargs['override']: + obj[resource].update(kwargs['override'][i]) + objects.append(obj) + req_data = {collection: objects} + req = self.new_create_request(collection, req_data, fmt) + if ('set_context' in kwargs and + kwargs['set_context'] is True and + 'tenant_id' in kwargs): + # create a specific auth context for this request + req.environ['quantum.context'] = context.Context( + '', kwargs['tenant_id']) + elif 'context' in kwargs: + req.environ['quantum.context'] = kwargs['context'] + return req.get_response(self.api) + def _create_network(self, fmt, name, admin_status_up, **kwargs): data = {'network': {'name': name, 'admin_state_up': admin_status_up, @@ -134,6 +163,12 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): return network_req.get_response(self.api) + def _create_network_bulk(self, fmt, number, name, + admin_status_up, **kwargs): + base_data = {'network': {'admin_state_up': admin_status_up, + 'tenant_id': self._tenant_id}} + return self._create_bulk(fmt, number, 'network', base_data, **kwargs) + def _create_subnet(self, fmt, net_id, cidr, expected_res_status=None, **kwargs): data = {'subnet': {'network_id': net_id, @@ -157,6 +192,19 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): self.assertEqual(subnet_res.status_int, expected_res_status) return subnet_res + def _create_subnet_bulk(self, fmt, number, net_id, name, + ip_version=4, **kwargs): + base_data = {'subnet': {'network_id': net_id, + 'ip_version': ip_version, + 'tenant_id': self._tenant_id}} + # auto-generate cidrs as they should not overlap + overrides = dict((k, v) + for (k, v) in zip(range(0, number), + [{'cidr': "10.0.%s.0/24" % num} + for num in range(0, number)])) + kwargs.update({'override': overrides}) + return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs) + def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs): content_type = 'application/' + fmt data = {'port': {'network_id': net_id, @@ -196,6 +244,13 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): self.assertEqual(port_res.status_int, expected_res_status) return port_res + def _create_port_bulk(self, fmt, number, net_id, name, + admin_status_up, **kwargs): + base_data = {'port': {'network_id': net_id, + 'admin_state_up': admin_status_up, + 'tenant_id': self._tenant_id}} + return self._create_bulk(fmt, number, 'port', base_data, **kwargs) + def _make_subnet(self, fmt, network, gateway, cidr, allocation_pools=None, ip_version=4, enable_dhcp=True): res = self._create_subnet(fmt, @@ -220,6 +275,29 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): req = self.new_delete_request(collection, id) req.get_response(self.api) + def _do_side_effect(self, patched_plugin, orig, *args, **kwargs): + """ Invoked by test cases for injecting failures in plugin """ + def second_call(*args, **kwargs): + raise Exception('boom') + patched_plugin.side_effect = second_call + return orig(*args, **kwargs) + + def _validate_behavior_on_bulk_failure(self, res, collection): + self.assertEqual(res.status_int, 500) + req = self.new_list_request(collection) + res = req.get_response(self.api) + self.assertEquals(res.status_int, 200) + items = self.deserialize('json', res) + self.assertEqual(len(items[collection]), 0) + + def _validate_behavior_on_bulk_success(self, res, collection, + names=['test_0', 'test_1']): + self.assertEqual(res.status_int, 201) + items = self.deserialize('json', res)[collection] + self.assertEqual(len(items), 2) + self.assertEqual(items[0]['name'], 'test_0') + self.assertEqual(items[1]['name'], 'test_1') + @contextlib.contextmanager def network(self, name='net1', admin_status_up=True, @@ -429,6 +507,90 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(port['port'][k], v) self.assertTrue('mac_address' in port['port']) + def test_create_ports_bulk_native(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk port create") + with self.network() as net: + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True) + self._validate_behavior_on_bulk_success(res, 'ports') + + def test_create_ports_bulk_emulated(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + with self.network() as net: + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True) + self._validate_behavior_on_bulk_success(res, 'ports') + + def test_create_ports_bulk_wrong_input(self): + with self.network() as net: + overrides = {1: {'admin_state_up': 'doh'}} + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True, + override=overrides) + self.assertEqual(res.status_int, 400) + req = self.new_list_request('ports') + res = req.get_response(self.api) + self.assertEquals(res.status_int, 200) + ports = self.deserialize('json', res) + self.assertEqual(len(ports['ports']), 0) + + def test_create_ports_bulk_emulated_plugin_failure(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + orig = QuantumManager.get_plugin().create_port + with mock.patch.object(QuantumManager.get_plugin(), + 'create_port') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + with self.network() as net: + res = self._create_port_bulk('json', 2, + net['network']['id'], + 'test', + True) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'ports') + + def test_create_ports_bulk_native_plugin_failure(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk port create") + ctx = context.get_admin_context() + with self.network() as net: + orig = QuantumManager._instance.plugin.create_port + with mock.patch.object(QuantumManager._instance.plugin, + 'create_port') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True, context=ctx) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'ports') + def test_list_ports(self): with contextlib.nested(self.port(), self.port()) as (port1, port2): req = self.new_list_request('ports', 'json') @@ -1061,6 +1223,77 @@ class TestNetworksV2(QuantumDbPluginV2TestCase): network['network']['id']) self.assertEqual(req.get_response(self.api).status_int, 409) + def test_create_networks_bulk_native(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk network create") + res = self._create_network_bulk('json', 2, 'test', True) + self._validate_behavior_on_bulk_success(res, 'networks') + + def test_create_networks_bulk_emulated(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + res = self._create_network_bulk('json', 2, 'test', True) + self._validate_behavior_on_bulk_success(res, 'networks') + + def test_create_networks_bulk_wrong_input(self): + res = self._create_network_bulk('json', 2, 'test', True, + override={1: + {'admin_state_up': 'doh'}}) + self.assertEqual(res.status_int, 400) + req = self.new_list_request('networks') + res = req.get_response(self.api) + self.assertEquals(res.status_int, 200) + nets = self.deserialize('json', res) + self.assertEqual(len(nets['networks']), 0) + + def test_create_networks_bulk_emulated_plugin_failure(self): + real_has_attr = hasattr + + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + orig = QuantumManager.get_plugin().create_network + #ensures the API choose the emulation code path + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + with mock.patch.object(QuantumManager.get_plugin(), + 'create_network') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + res = self._create_network_bulk('json', 2, 'test', True) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'networks') + + def test_create_networks_bulk_native_plugin_failure(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk network create") + orig = QuantumManager.get_plugin().create_network + with mock.patch.object(QuantumManager.get_plugin(), + 'create_network') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + res = self._create_network_bulk('json', 2, 'test', True) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'networks') + def test_list_networks(self): with self.network(name='net1') as net1: with self.network(name='net2') as net2: @@ -1157,6 +1390,77 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase): pass self.assertEquals(ctx_manager.exception.code, 400) + def test_create_subnets_bulk_native(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk subnet create") + with self.network() as net: + res = self._create_subnet_bulk('json', 2, net['network']['id'], + 'test') + self._validate_behavior_on_bulk_success(res, 'subnets') + + def test_create_subnets_bulk_emulated(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + with self.network() as net: + res = self._create_subnet_bulk('json', 2, + net['network']['id'], + 'test') + self._validate_behavior_on_bulk_success(res, 'subnets') + + def test_create_subnets_bulk_emulated_plugin_failure(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + orig = QuantumManager.get_plugin().create_subnet + with mock.patch.object(QuantumManager.get_plugin(), + 'create_subnet') as patched_plugin: + + def side_effect(*args, **kwargs): + self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + with self.network() as net: + res = self._create_subnet_bulk('json', 2, + net['network']['id'], + 'test') + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'subnets') + + def test_create_subnets_bulk_native_plugin_failure(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk subnet create") + orig = QuantumManager._instance.plugin.create_subnet + with mock.patch.object(QuantumManager._instance.plugin, + 'create_subnet') as patched_plugin: + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + with self.network() as net: + res = self._create_subnet_bulk('json', 2, + net['network']['id'], + 'test') + + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'subnets') + def test_delete_subnet(self): gateway_ip = '10.0.0.1' cidr = '10.0.0.0/24'