]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix bulk create operations and make them atomic.
authorSalvatore Orlando <salv.orlando@gmail.com>
Mon, 13 Aug 2012 16:31:38 +0000 (09:31 -0700)
committerSalvatore Orlando <salv.orlando@gmail.com>
Mon, 13 Aug 2012 16:31:38 +0000 (09:31 -0700)
Bug 1024844
Bug 1020639

The API layer is now able to issue bulk create requests to the plugin,
assuming that the plugin supports them. Otherwise, the API layer will
emulate atomic behavior.
This patch also implements OVS plugin support for bulk requests.

Change-Id: I515148d870d0dff8371862fe577c477538364929

etc/quantum.conf
quantum/api/v2/base.py
quantum/api/v2/router.py
quantum/common/config.py
quantum/db/db_base_plugin_v2.py
quantum/plugins/linuxbridge/lb_quantum_plugin.py
quantum/plugins/openvswitch/ovs_db_v2.py
quantum/plugins/openvswitch/ovs_quantum_plugin.py
quantum/plugins/openvswitch/tests/unit/test_ovs_db.py
quantum/tests/unit/test_api_v2.py
quantum/tests/unit/test_db_plugin.py

index 97584d271e14618f03225c45ec42e4e0b8d63503..ad8e18de2a17caff21b8fcb0703296779d8d4a28 100644 (file)
@@ -39,6 +39,8 @@ api_paste_config = api-paste.ini
 # Maximum amount of retries to generate a unique MAC address
 # mac_generation_retries = 16
 
+# Enable or disable bulk create/update/delete operations
+# allow_bulk = True
 # RPC configuration options. Defined in rpc __init__
 # The messaging module to use, defaults to kombu.
 # rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu
index 5a013bb68546578347d922babbf938876484e3bc..e499fa9060a3605a35aed7cb41adac5318d6f773 100644 (file)
@@ -117,15 +117,23 @@ def verbose(request):
 
 
 class Controller(object):
-    def __init__(self, plugin, collection, resource, attr_info):
+    def __init__(self, plugin, collection, resource,
+                 attr_info, allow_bulk=False):
         self._plugin = plugin
         self._collection = collection
         self._resource = resource
         self._attr_info = attr_info
+        self._allow_bulk = allow_bulk
+        self._native_bulk = self._is_native_bulk_supported()
         self._policy_attrs = [name for (name, info) in self._attr_info.items()
                               if info.get('required_by_policy')]
         self._publisher_id = notifier_api.publisher_id('network')
 
+    def _is_native_bulk_supported(self):
+        native_bulk_attr_name = ("_%s__native_bulk_support"
+                                 % self._plugin.__class__.__name__)
+        return getattr(self._plugin, native_bulk_attr_name, False)
+
     def _is_visible(self, attr):
         attr_val = self._attr_info.get(attr)
         return attr_val and attr_val['is_visible']
@@ -209,6 +217,32 @@ class Controller(object):
             # doesn't exist
             raise webob.exc.HTTPNotFound()
 
+    def _emulate_bulk_create(self, obj_creator, request, body):
+        objs = []
+        try:
+            for item in body[self._collection]:
+                kwargs = {self._resource: item}
+                objs.append(self._view(obj_creator(request.context,
+                                                   **kwargs)))
+            return objs
+        # Note(salvatore-orlando): broad catch as in theory a plugin
+        # could raise any kind of exception
+        except Exception as ex:
+            for obj in objs:
+                delete_action = "delete_%s" % self._resource
+                obj_deleter = getattr(self._plugin, delete_action)
+                try:
+                    obj_deleter(request.context, obj['id'])
+                except Exception:
+                    # broad catch as our only purpose is to log the exception
+                    LOG.exception("Unable to undo add for %s %s",
+                                  self._resource, obj['id'])
+            # TODO(salvatore-orlando): The object being processed when the
+            # plugin raised might have been created or not in the db.
+            # We need a way for ensuring that if it has been created,
+            # it is then deleted
+            raise
+
     def create(self, request, body=None):
         """Creates a new instance of the requested entity"""
         notifier_api.notify(request.context,
@@ -216,10 +250,8 @@ class Controller(object):
                             self._resource + '.create.start',
                             notifier_api.INFO,
                             body)
-        body = self._prepare_request_body(request.context, body, True,
-                                          allow_bulk=True)
+        body = self._prepare_request_body(request.context, body, True)
         action = "create_%s" % self._resource
-
         # Check authz
         try:
             if self._collection in body:
@@ -256,16 +288,30 @@ class Controller(object):
             LOG.exception("Create operation not authorized")
             raise webob.exc.HTTPForbidden()
 
-        obj_creator = getattr(self._plugin, action)
-        kwargs = {self._resource: body}
-        obj = obj_creator(request.context, **kwargs)
-        result = {self._resource: self._view(obj)}
-        notifier_api.notify(request.context,
-                            self._publisher_id,
-                            self._resource + '.create.end',
-                            notifier_api.INFO,
-                            result)
-        return result
+        def notify(create_result):
+            notifier_api.notify(request.context,
+                                self._publisher_id,
+                                self._resource + '.create.end',
+                                notifier_api.INFO,
+                                create_result)
+            return create_result
+
+        if self._collection in body and self._native_bulk:
+            # plugin does atomic bulk create operations
+            obj_creator = getattr(self._plugin, "%s_bulk" % action)
+            objs = obj_creator(request.context, body)
+            return notify({self._collection: [self._view(obj)
+                                              for obj in objs]})
+        else:
+            obj_creator = getattr(self._plugin, action)
+            if self._collection in body:
+                # Emulate atomic bulk behavior
+                objs = self._emulate_bulk_create(obj_creator, request, body)
+                return notify({self._collection: objs})
+            else:
+                kwargs = {self._resource: body}
+                obj = obj_creator(request.context, **kwargs)
+                return notify({self._resource: self._view(obj)})
 
     def delete(self, request, id):
         """Deletes the specified entity"""
@@ -355,8 +401,7 @@ class Controller(object):
                         " that tenant_id is specified")
                 raise webob.exc.HTTPBadRequest(msg)
 
-    def _prepare_request_body(self, context, body, is_create,
-                              allow_bulk=False):
+    def _prepare_request_body(self, context, body, is_create):
         """ verifies required attributes are in request body, and that
             an attribute is only specified if it is allowed for the given
             operation (create/update).
@@ -369,7 +414,7 @@ class Controller(object):
             raise webob.exc.HTTPBadRequest(_("Resource body required"))
 
         body = body or {self._resource: {}}
-        if self._collection in body and allow_bulk:
+        if self._collection in body and self._allow_bulk:
             bulk_body = [self._prepare_request_body(context,
                                                     {self._resource: b},
                                                     is_create)
@@ -382,7 +427,7 @@ class Controller(object):
 
             return {self._collection: bulk_body}
 
-        elif self._collection in body and not allow_bulk:
+        elif self._collection in body and not self._allow_bulk:
             raise webob.exc.HTTPBadRequest("Bulk operation not supported")
 
         res_dict = body.get(self._resource)
@@ -459,8 +504,8 @@ class Controller(object):
             })
 
 
-def create_resource(collection, resource, plugin, params):
-    controller = Controller(plugin, collection, resource, params)
+def create_resource(collection, resource, plugin, params, allow_bulk=False):
+    controller = Controller(plugin, collection, resource, params, allow_bulk)
 
     # NOTE(jkoelker) To anyone wishing to add "proper" xml support
     #                this is where you do it
index af07e3ec2493c7693cc83d41f49aee8bfb0704cf..dcd719373dc366803fdfeade9b0cce76a227ccc7 100644 (file)
@@ -69,7 +69,6 @@ class APIRouter(wsgi.Router):
     def __init__(self, **local_config):
         mapper = routes_mapper.Mapper()
         plugin = manager.QuantumManager.get_plugin()
-
         ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
         ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
 
@@ -81,8 +80,10 @@ class APIRouter(wsgi.Router):
                      'port': 'ports'}
 
         def _map_resource(collection, resource, params):
+            allow_bulk = cfg.CONF.allow_bulk
             controller = base.create_resource(collection, resource,
-                                              plugin, params)
+                                              plugin, params,
+                                              allow_bulk=allow_bulk)
             mapper_kwargs = dict(controller=controller,
                                  requirements=REQUIREMENTS,
                                  **col_kwargs)
index eb84e9eb414307e677b45f23c485ed73fe2d41c7..6b463f3dee34917b6bd305ae077cccbcaeef5b8a 100644 (file)
@@ -43,7 +43,8 @@ core_opts = [
     cfg.StrOpt('core_plugin',
                default='quantum.plugins.sample.SamplePlugin.FakePlugin'),
     cfg.StrOpt('base_mac', default="fa:16:3e:00:00:00"),
-    cfg.IntOpt('mac_generation_retries', default=16)
+    cfg.IntOpt('mac_generation_retries', default=16),
+    cfg.BoolOpt('allow_bulk', default=True),
 ]
 
 # Register the configuration options
index e50d2e4c77e9445d75e354c9200de58d2b61e008..eb3f997a757b188d8620847a5e78a4869bf35e07 100644 (file)
@@ -41,6 +41,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
         certain events.
     """
 
+    # This attribute specifies whether the plugin supports or not
+    # bulk operations. Name mangling is used in order to ensure it
+    # is qualified by class
+    __native_bulk_support = True
+
     def __init__(self):
         # NOTE(jkoelker) This is an incomlete implementation. Subclasses
         #                must override __init__ and setup the database
@@ -673,12 +678,34 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
                "device_id": port["device_id"]}
         return self._fields(res, fields)
 
+    def _create_bulk(self, resource, context, request_items):
+        objects = []
+        collection = "%ss" % resource
+        items = request_items[collection]
+        context.session.begin(subtransactions=True)
+        try:
+            for item in items:
+                obj_creator = getattr(self, 'create_%s' % resource)
+                objects.append(obj_creator(context, item))
+            context.session.commit()
+        except Exception:
+            LOG.exception("An exception occured while creating "
+                          "the port:%s", item)
+            context.session.rollback()
+            raise
+        return objects
+
+    def create_network_bulk(self, context, networks):
+        return self._create_bulk('network', context, networks)
+
     def create_network(self, context, network):
+        """ handle creation of a single network """
+        # single request processing
         n = network['network']
         # NOTE(jkoelker) Get the tenant_id outside of the session to avoid
         #                unneeded db action if the operation raises
         tenant_id = self._get_tenant_id_for_create(context, n)
-        with context.session.begin():
+        with context.session.begin(subtransactions=True):
             network = models_v2.Network(tenant_id=tenant_id,
                                         id=n.get('id') or utils.str_uuid(),
                                         name=n['name'],
@@ -721,6 +748,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
                                     filters=filters, fields=fields,
                                     verbose=verbose)
 
+    def create_subnet_bulk(self, context, subnets):
+        return self._create_bulk('subnet', context, subnets)
+
     def create_subnet(self, context, subnet):
         s = subnet['subnet']
         net = netaddr.IPNetwork(s['cidr'])
@@ -728,7 +758,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
             s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
 
         tenant_id = self._get_tenant_id_for_create(context, s)
-        with context.session.begin():
+        with context.session.begin(subtransactions=True):
             network = self._get_network(context, s["network_id"])
             self._validate_subnet_cidr(network, s['cidr'])
             subnet = models_v2.Subnet(tenant_id=tenant_id,
@@ -780,13 +810,16 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
                                     filters=filters, fields=fields,
                                     verbose=verbose)
 
+    def create_port_bulk(self, context, ports):
+        return self._create_bulk('port', context, ports)
+
     def create_port(self, context, port):
         p = port['port']
         # NOTE(jkoelker) Get the tenant_id outside of the session to avoid
         #                unneeded db action if the operation raises
         tenant_id = self._get_tenant_id_for_create(context, p)
 
-        with context.session.begin():
+        with context.session.begin(subtransactions=True):
             network = self._get_network(context, p["network_id"])
 
             # Ensure that a MAC address is defined and it is unique on the
@@ -817,7 +850,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
 
         # Update the allocated IP's
         if ips:
-            with context.session.begin():
+            with context.session.begin(subtransactions=True):
                 for ip in ips:
                     LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'],
                               port['network_id'], ip['subnet_id'], port.id)
index 1b0507c8d57788e2295c1343a0e3611b85494cfb..80f88b9d489d0119574de7ef0052d811f62e0fe2 100644 (file)
@@ -196,7 +196,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
             super(LinuxBridgePluginV2, self).delete_network(context,
                                                             net['id'])
             raise
-
         return net
 
     def update_network(self, context, id, network):
index 7298e439d7882409b635204d5aa3f7e42cd3a36f..d7c29c1bb4216cf43cfa797dd89238ce0ecb9fbb 100644 (file)
@@ -40,8 +40,8 @@ def get_vlans():
     return [(binding.vlan_id, binding.network_id) for binding in bindings]
 
 
-def get_vlan(net_id):
-    session = db.get_session()
+def get_vlan(net_id, session=None):
+    session = session or db.get_session()
     try:
         binding = (session.query(ovs_models_v2.VlanBinding).
                    filter_by(network_id=net_id).
@@ -51,11 +51,10 @@ def get_vlan(net_id):
     return binding.vlan_id
 
 
-def add_vlan_binding(vlan_id, net_id):
-    session = db.get_session()
-    binding = ovs_models_v2.VlanBinding(vlan_id, net_id)
-    session.add(binding)
-    session.flush()
+def add_vlan_binding(vlan_id, net_id, session):
+    with session.begin(subtransactions=True):
+        binding = ovs_models_v2.VlanBinding(vlan_id, net_id)
+        session.add(binding)
     return binding
 
 
@@ -114,10 +113,9 @@ def get_vlan_id(vlan_id):
         return None
 
 
-def reserve_vlan_id():
+def reserve_vlan_id(session):
     """Reserve an unused vlan_id"""
 
-    session = db.get_session()
     with session.begin(subtransactions=True):
         record = (session.query(ovs_models_v2.VlanID).
                   filter_by(vlan_used=False).
@@ -129,14 +127,13 @@ def reserve_vlan_id():
     return record.vlan_id
 
 
-def reserve_specific_vlan_id(vlan_id):
+def reserve_specific_vlan_id(vlan_id, session):
     """Reserve a specific vlan_id"""
 
     if vlan_id < 1 or vlan_id > 4094:
         msg = _("Specified VLAN %s outside legal range (1-4094)") % vlan_id
         raise q_exc.InvalidInput(error_message=msg)
 
-    session = db.get_session()
     with session.begin(subtransactions=True):
         try:
             record = (session.query(ovs_models_v2.VlanID).
index 068f4adc4d1b53508c5c5a74e7821d9fe19e72bd..6792cd54f54687eb3077ab7cbb6f16c5065b6c79 100644 (file)
@@ -177,6 +177,10 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
     be updated to take advantage of it.
     """
 
+    # This attribute specifies whether the plugin supports or not
+    # bulk operations. Name mangling is used in order to ensure it
+    # is qualified by class
+    __native_bulk_support = True
     supported_extension_aliases = ["provider"]
 
     def __init__(self, configfile=None):
@@ -227,7 +231,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
     def _extend_network_dict(self, context, network):
         if self._check_provider_view_auth(context, network):
             if not self.enable_tunneling:
-                network['provider:vlan_id'] = ovs_db_v2.get_vlan(network['id'])
+                network['provider:vlan_id'] = ovs_db_v2.get_vlan(
+                    network['id'], context.session)
 
     def create_network(self, context, network):
         net = super(OVSQuantumPluginV2, self).create_network(context, network)
@@ -235,15 +240,15 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
             vlan_id = network['network'].get('provider:vlan_id')
             if vlan_id not in (None, attributes.ATTR_NOT_SPECIFIED):
                 self._enforce_provider_set_auth(context, net)
-                ovs_db_v2.reserve_specific_vlan_id(vlan_id)
+                ovs_db_v2.reserve_specific_vlan_id(vlan_id, context.session)
             else:
-                vlan_id = ovs_db_v2.reserve_vlan_id()
+                vlan_id = ovs_db_v2.reserve_vlan_id(context.session)
         except Exception:
             super(OVSQuantumPluginV2, self).delete_network(context, net['id'])
             raise
 
         LOG.debug("Created network: %s" % net['id'])
-        ovs_db_v2.add_vlan_binding(vlan_id, str(net['id']))
+        ovs_db_v2.add_vlan_binding(vlan_id, str(net['id']), context.session)
         self._extend_network_dict(context, net)
         return net
 
index 707bb52505f2d073969f1da0f61d65b892ce7233..8f82ea434a31714d1a6d6afa35c34a3afbb024c5 100644 (file)
@@ -63,46 +63,50 @@ class OVSVlanIdsTest(unittest2.TestCase):
         self.assertIsNone(ovs_db_v2.get_vlan_id(VLAN_MAX + 5 + 1))
 
     def test_vlan_id_pool(self):
+        session = db.get_session()
         vlan_ids = set()
         for x in xrange(VLAN_MIN, VLAN_MAX + 1):
-            vlan_id = ovs_db_v2.reserve_vlan_id()
+            vlan_id = ovs_db_v2.reserve_vlan_id(db.get_session())
             self.assertGreaterEqual(vlan_id, VLAN_MIN)
             self.assertLessEqual(vlan_id, VLAN_MAX)
             vlan_ids.add(vlan_id)
 
         with self.assertRaises(q_exc.NoNetworkAvailable):
-            vlan_id = ovs_db_v2.reserve_vlan_id()
+            vlan_id = ovs_db_v2.reserve_vlan_id(session)
 
         for vlan_id in vlan_ids:
             ovs_db_v2.release_vlan_id(vlan_id)
 
     def test_invalid_specific_vlan_id(self):
+        session = db.get_session()
         with self.assertRaises(q_exc.InvalidInput):
-            vlan_id = ovs_db_v2.reserve_specific_vlan_id(0)
+            vlan_id = ovs_db_v2.reserve_specific_vlan_id(0, session)
 
         with self.assertRaises(q_exc.InvalidInput):
-            vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095)
+            vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095, session)
 
     def test_specific_vlan_id_inside_pool(self):
+        session = db.get_session()
         vlan_id = VLAN_MIN + 5
         self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
-        ovs_db_v2.reserve_specific_vlan_id(vlan_id)
+        ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
         self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
 
         with self.assertRaises(q_exc.VlanIdInUse):
-            ovs_db_v2.reserve_specific_vlan_id(vlan_id)
+            ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
 
         ovs_db_v2.release_vlan_id(vlan_id)
         self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
 
     def test_specific_vlan_id_outside_pool(self):
+        session = db.get_session()
         vlan_id = VLAN_MAX + 5
         self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id))
-        ovs_db_v2.reserve_specific_vlan_id(vlan_id)
+        ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
         self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used)
 
         with self.assertRaises(q_exc.VlanIdInUse):
-            ovs_db_v2.reserve_specific_vlan_id(vlan_id)
+            ovs_db_v2.reserve_specific_vlan_id(vlan_id, session)
 
         ovs_db_v2.release_vlan_id(vlan_id)
         self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id))
index f46eaadba280d23621b27f818159d6a92412734d..0c7360b8c75fc1a700bcbe2d8ee75fe684ef87de 100644 (file)
@@ -551,16 +551,17 @@ class JSONV2TestCase(APIv2TestBase):
         self.assertEqual(res.status_int, 422)
 
     def test_create_bulk(self):
-        data = {'networks': [{'name': 'net1', 'admin_state_up': True,
+        data = {'networks': [{'name': 'net1',
+                              'admin_state_up': True,
                               'tenant_id': _uuid()},
-                             {'name': 'net2', 'admin_state_up': True,
+                             {'name': 'net2',
+                              'admin_state_up': True,
                               'tenant_id': _uuid()}]}
 
         def side_effect(context, network):
-            nets = network.copy()
-            for net in nets['networks']:
-                net.update({'subnets': []})
-            return nets
+            net = network.copy()
+            net['network'].update({'subnets': []})
+            return net['network']
 
         instance = self.plugin.return_value
         instance.create_network.side_effect = side_effect
@@ -904,7 +905,6 @@ class ExtensionTestCase(unittest.TestCase):
         self.api = None
         self.plugin = None
         cfg.CONF.reset()
-
         # Restore the global RESOURCE_ATTRIBUTE_MAP
         attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
 
index 7dd902f31874acfcf04af8cd4a56693cf50d74f3..9c37e4c8d9b66ba72b8764cc0f330e1dbaf84cd4 100644 (file)
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import contextlib
+import copy
 import logging
 import mock
 import os
@@ -36,6 +37,7 @@ from quantum.wsgi import Serializer, JSONDeserializer
 
 LOG = logging.getLogger(__name__)
 
+DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
 ROOTDIR = os.path.dirname(os.path.dirname(__file__))
 ETCDIR = os.path.join(ROOTDIR, 'etc')
 
@@ -62,10 +64,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
             'application/json': json_deserializer,
         }
 
-        plugin = test_config.get('plugin_name_v2',
-                                 'quantum.db.db_base_plugin_v2.'
-                                 'QuantumDbPluginV2')
-        LOG.debug("db plugin test, the plugin is:%s", plugin)
+        plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
         # Create the default configurations
         args = ['--config-file', etcdir('quantum.conf.test')]
         config.parse(args=args)
@@ -74,6 +73,14 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
         cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
         self.api = APIRouter()
 
+        def _is_native_bulk_supported():
+            plugin_obj = QuantumManager.get_plugin()
+            native_bulk_attr_name = ("_%s__native_bulk_support"
+                                     % plugin_obj.__class__.__name__)
+            return getattr(plugin_obj, native_bulk_attr_name, False)
+
+        self._skip_native_bulk = not _is_native_bulk_supported()
+
     def tearDown(self):
         super(QuantumDbPluginV2TestCase, self).tearDown()
         # NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
@@ -118,6 +125,28 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
         data = self._deserializers[ctype].deserialize(response.body)['body']
         return data
 
+    def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
+        """ Creates a bulk request for any kind of resource """
+        objects = []
+        collection = "%ss" % resource
+        for i in range(0, number):
+            obj = copy.deepcopy(data)
+            obj[resource]['name'] = "%s_%s" % (name, i)
+            if 'override' in kwargs and i in kwargs['override']:
+                obj[resource].update(kwargs['override'][i])
+            objects.append(obj)
+        req_data = {collection: objects}
+        req = self.new_create_request(collection, req_data, fmt)
+        if ('set_context' in kwargs and
+                kwargs['set_context'] is True and
+                'tenant_id' in kwargs):
+            # create a specific auth context for this request
+            req.environ['quantum.context'] = context.Context(
+                '', kwargs['tenant_id'])
+        elif 'context' in kwargs:
+            req.environ['quantum.context'] = kwargs['context']
+        return req.get_response(self.api)
+
     def _create_network(self, fmt, name, admin_status_up, **kwargs):
         data = {'network': {'name': name,
                             'admin_state_up': admin_status_up,
@@ -134,6 +163,12 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
 
         return network_req.get_response(self.api)
 
+    def _create_network_bulk(self, fmt, number, name,
+                             admin_status_up, **kwargs):
+        base_data = {'network': {'admin_state_up': admin_status_up,
+                                 'tenant_id': self._tenant_id}}
+        return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
+
     def _create_subnet(self, fmt, net_id, cidr,
                        expected_res_status=None, **kwargs):
         data = {'subnet': {'network_id': net_id,
@@ -157,6 +192,19 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
             self.assertEqual(subnet_res.status_int, expected_res_status)
         return subnet_res
 
+    def _create_subnet_bulk(self, fmt, number, net_id, name,
+                            ip_version=4, **kwargs):
+        base_data = {'subnet': {'network_id': net_id,
+                                'ip_version': ip_version,
+                                'tenant_id': self._tenant_id}}
+        # auto-generate cidrs as they should not overlap
+        overrides = dict((k, v)
+                         for (k, v) in zip(range(0, number),
+                                           [{'cidr': "10.0.%s.0/24" % num}
+                                            for num in range(0, number)]))
+        kwargs.update({'override': overrides})
+        return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
+
     def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs):
         content_type = 'application/' + fmt
         data = {'port': {'network_id': net_id,
@@ -196,6 +244,13 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
             self.assertEqual(port_res.status_int, expected_res_status)
         return port_res
 
+    def _create_port_bulk(self, fmt, number, net_id, name,
+                          admin_status_up, **kwargs):
+        base_data = {'port': {'network_id': net_id,
+                              'admin_state_up': admin_status_up,
+                              'tenant_id': self._tenant_id}}
+        return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
+
     def _make_subnet(self, fmt, network, gateway, cidr,
                      allocation_pools=None, ip_version=4, enable_dhcp=True):
         res = self._create_subnet(fmt,
@@ -220,6 +275,29 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
         req = self.new_delete_request(collection, id)
         req.get_response(self.api)
 
+    def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
+        """ Invoked by test cases for injecting failures in plugin """
+        def second_call(*args, **kwargs):
+            raise Exception('boom')
+        patched_plugin.side_effect = second_call
+        return orig(*args, **kwargs)
+
+    def _validate_behavior_on_bulk_failure(self, res, collection):
+        self.assertEqual(res.status_int, 500)
+        req = self.new_list_request(collection)
+        res = req.get_response(self.api)
+        self.assertEquals(res.status_int, 200)
+        items = self.deserialize('json', res)
+        self.assertEqual(len(items[collection]), 0)
+
+    def _validate_behavior_on_bulk_success(self, res, collection,
+                                           names=['test_0', 'test_1']):
+        self.assertEqual(res.status_int, 201)
+        items = self.deserialize('json', res)[collection]
+        self.assertEqual(len(items), 2)
+        self.assertEqual(items[0]['name'], 'test_0')
+        self.assertEqual(items[1]['name'], 'test_1')
+
     @contextlib.contextmanager
     def network(self, name='net1',
                 admin_status_up=True,
@@ -429,6 +507,90 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
                 self.assertEquals(port['port'][k], v)
             self.assertTrue('mac_address' in port['port'])
 
+    def test_create_ports_bulk_native(self):
+        if self._skip_native_bulk:
+            self.skipTest("Plugin does not support native bulk port create")
+        with self.network() as net:
+            res = self._create_port_bulk('json', 2, net['network']['id'],
+                                         'test', True)
+            self._validate_behavior_on_bulk_success(res, 'ports')
+
+    def test_create_ports_bulk_emulated(self):
+        real_has_attr = hasattr
+
+        #ensures the API choose the emulation code path
+        def fakehasattr(item, attr):
+            if attr.endswith('__native_bulk_support'):
+                return False
+            return real_has_attr(item, attr)
+
+        with mock.patch('__builtin__.hasattr',
+                        new=fakehasattr):
+            with self.network() as net:
+                res = self._create_port_bulk('json', 2, net['network']['id'],
+                                             'test', True)
+                self._validate_behavior_on_bulk_success(res, 'ports')
+
+    def test_create_ports_bulk_wrong_input(self):
+        with self.network() as net:
+            overrides = {1: {'admin_state_up': 'doh'}}
+            res = self._create_port_bulk('json', 2, net['network']['id'],
+                                         'test', True,
+                                         override=overrides)
+            self.assertEqual(res.status_int, 400)
+            req = self.new_list_request('ports')
+            res = req.get_response(self.api)
+            self.assertEquals(res.status_int, 200)
+            ports = self.deserialize('json', res)
+            self.assertEqual(len(ports['ports']), 0)
+
+    def test_create_ports_bulk_emulated_plugin_failure(self):
+        real_has_attr = hasattr
+
+        #ensures the API choose the emulation code path
+        def fakehasattr(item, attr):
+            if attr.endswith('__native_bulk_support'):
+                return False
+            return real_has_attr(item, attr)
+
+        with mock.patch('__builtin__.hasattr',
+                        new=fakehasattr):
+            orig = QuantumManager.get_plugin().create_port
+            with mock.patch.object(QuantumManager.get_plugin(),
+                                   'create_port') as patched_plugin:
+
+                def side_effect(*args, **kwargs):
+                    return self._do_side_effect(patched_plugin, orig,
+                                                *args, **kwargs)
+
+                patched_plugin.side_effect = side_effect
+                with self.network() as net:
+                    res = self._create_port_bulk('json', 2,
+                                                 net['network']['id'],
+                                                 'test',
+                                                 True)
+                    # We expect a 500 as we injected a fault in the plugin
+                    self._validate_behavior_on_bulk_failure(res, 'ports')
+
+    def test_create_ports_bulk_native_plugin_failure(self):
+        if self._skip_native_bulk:
+            self.skipTest("Plugin does not support native bulk port create")
+        ctx = context.get_admin_context()
+        with self.network() as net:
+            orig = QuantumManager._instance.plugin.create_port
+            with mock.patch.object(QuantumManager._instance.plugin,
+                                   'create_port') as patched_plugin:
+
+                def side_effect(*args, **kwargs):
+                    return self._do_side_effect(patched_plugin, orig,
+                                                *args, **kwargs)
+
+                patched_plugin.side_effect = side_effect
+                res = self._create_port_bulk('json', 2, net['network']['id'],
+                                             'test', True, context=ctx)
+                # We expect a 500 as we injected a fault in the plugin
+                self._validate_behavior_on_bulk_failure(res, 'ports')
+
     def test_list_ports(self):
         with contextlib.nested(self.port(), self.port()) as (port1, port2):
             req = self.new_list_request('ports', 'json')
@@ -1061,6 +1223,77 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
                                           network['network']['id'])
             self.assertEqual(req.get_response(self.api).status_int, 409)
 
+    def test_create_networks_bulk_native(self):
+        if self._skip_native_bulk:
+            self.skipTest("Plugin does not support native bulk network create")
+        res = self._create_network_bulk('json', 2, 'test', True)
+        self._validate_behavior_on_bulk_success(res, 'networks')
+
+    def test_create_networks_bulk_emulated(self):
+        real_has_attr = hasattr
+
+        #ensures the API choose the emulation code path
+        def fakehasattr(item, attr):
+            if attr.endswith('__native_bulk_support'):
+                return False
+            return real_has_attr(item, attr)
+
+        with mock.patch('__builtin__.hasattr',
+                        new=fakehasattr):
+            res = self._create_network_bulk('json', 2, 'test', True)
+            self._validate_behavior_on_bulk_success(res, 'networks')
+
+    def test_create_networks_bulk_wrong_input(self):
+        res = self._create_network_bulk('json', 2, 'test', True,
+                                        override={1:
+                                                  {'admin_state_up': 'doh'}})
+        self.assertEqual(res.status_int, 400)
+        req = self.new_list_request('networks')
+        res = req.get_response(self.api)
+        self.assertEquals(res.status_int, 200)
+        nets = self.deserialize('json', res)
+        self.assertEqual(len(nets['networks']), 0)
+
+    def test_create_networks_bulk_emulated_plugin_failure(self):
+        real_has_attr = hasattr
+
+        def fakehasattr(item, attr):
+            if attr.endswith('__native_bulk_support'):
+                return False
+            return real_has_attr(item, attr)
+
+        orig = QuantumManager.get_plugin().create_network
+        #ensures the API choose the emulation code path
+        with mock.patch('__builtin__.hasattr',
+                        new=fakehasattr):
+            with mock.patch.object(QuantumManager.get_plugin(),
+                                   'create_network') as patched_plugin:
+
+                def side_effect(*args, **kwargs):
+                    return self._do_side_effect(patched_plugin, orig,
+                                                *args, **kwargs)
+
+                patched_plugin.side_effect = side_effect
+                res = self._create_network_bulk('json', 2, 'test', True)
+                # We expect a 500 as we injected a fault in the plugin
+                self._validate_behavior_on_bulk_failure(res, 'networks')
+
+    def test_create_networks_bulk_native_plugin_failure(self):
+        if self._skip_native_bulk:
+            self.skipTest("Plugin does not support native bulk network create")
+        orig = QuantumManager.get_plugin().create_network
+        with mock.patch.object(QuantumManager.get_plugin(),
+                               'create_network') as patched_plugin:
+
+            def side_effect(*args, **kwargs):
+                return self._do_side_effect(patched_plugin, orig,
+                                            *args, **kwargs)
+
+            patched_plugin.side_effect = side_effect
+            res = self._create_network_bulk('json', 2, 'test', True)
+            # We expect a 500 as we injected a fault in the plugin
+            self._validate_behavior_on_bulk_failure(res, 'networks')
+
     def test_list_networks(self):
         with self.network(name='net1') as net1:
             with self.network(name='net2') as net2:
@@ -1157,6 +1390,77 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
                         pass
                 self.assertEquals(ctx_manager.exception.code, 400)
 
+    def test_create_subnets_bulk_native(self):
+        if self._skip_native_bulk:
+            self.skipTest("Plugin does not support native bulk subnet create")
+        with self.network() as net:
+            res = self._create_subnet_bulk('json', 2, net['network']['id'],
+                                           'test')
+            self._validate_behavior_on_bulk_success(res, 'subnets')
+
+    def test_create_subnets_bulk_emulated(self):
+        real_has_attr = hasattr
+
+        #ensures the API choose the emulation code path
+        def fakehasattr(item, attr):
+            if attr.endswith('__native_bulk_support'):
+                return False
+            return real_has_attr(item, attr)
+
+        with mock.patch('__builtin__.hasattr',
+                        new=fakehasattr):
+            with self.network() as net:
+                res = self._create_subnet_bulk('json', 2,
+                                               net['network']['id'],
+                                               'test')
+                self._validate_behavior_on_bulk_success(res, 'subnets')
+
+    def test_create_subnets_bulk_emulated_plugin_failure(self):
+        real_has_attr = hasattr
+
+        #ensures the API choose the emulation code path
+        def fakehasattr(item, attr):
+            if attr.endswith('__native_bulk_support'):
+                return False
+            return real_has_attr(item, attr)
+
+        with mock.patch('__builtin__.hasattr',
+                        new=fakehasattr):
+            orig = QuantumManager.get_plugin().create_subnet
+            with mock.patch.object(QuantumManager.get_plugin(),
+                                   'create_subnet') as patched_plugin:
+
+                def side_effect(*args, **kwargs):
+                    self._do_side_effect(patched_plugin, orig,
+                                         *args, **kwargs)
+
+                patched_plugin.side_effect = side_effect
+                with self.network() as net:
+                    res = self._create_subnet_bulk('json', 2,
+                                                   net['network']['id'],
+                                                   'test')
+                # We expect a 500 as we injected a fault in the plugin
+                self._validate_behavior_on_bulk_failure(res, 'subnets')
+
+    def test_create_subnets_bulk_native_plugin_failure(self):
+        if self._skip_native_bulk:
+            self.skipTest("Plugin does not support native bulk subnet create")
+        orig = QuantumManager._instance.plugin.create_subnet
+        with mock.patch.object(QuantumManager._instance.plugin,
+                               'create_subnet') as patched_plugin:
+            def side_effect(*args, **kwargs):
+                return self._do_side_effect(patched_plugin, orig,
+                                            *args, **kwargs)
+
+            patched_plugin.side_effect = side_effect
+            with self.network() as net:
+                res = self._create_subnet_bulk('json', 2,
+                                               net['network']['id'],
+                                               'test')
+
+                # We expect a 500 as we injected a fault in the plugin
+                self._validate_behavior_on_bulk_failure(res, 'subnets')
+
     def test_delete_subnet(self):
         gateway_ip = '10.0.0.1'
         cidr = '10.0.0.0/24'