class Controller(object):
- def __init__(self, plugin, collection, resource, attr_info):
+ def __init__(self, plugin, collection, resource,
+ attr_info, allow_bulk=False):
self._plugin = plugin
self._collection = collection
self._resource = resource
self._attr_info = attr_info
+ self._allow_bulk = allow_bulk
+ self._native_bulk = self._is_native_bulk_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network')
+ def _is_native_bulk_supported(self):
+ native_bulk_attr_name = ("_%s__native_bulk_support"
+ % self._plugin.__class__.__name__)
+ return getattr(self._plugin, native_bulk_attr_name, False)
+
def _is_visible(self, attr):
attr_val = self._attr_info.get(attr)
return attr_val and attr_val['is_visible']
# doesn't exist
raise webob.exc.HTTPNotFound()
+ def _emulate_bulk_create(self, obj_creator, request, body):
+ objs = []
+ try:
+ for item in body[self._collection]:
+ kwargs = {self._resource: item}
+ objs.append(self._view(obj_creator(request.context,
+ **kwargs)))
+ return objs
+ # Note(salvatore-orlando): broad catch as in theory a plugin
+ # could raise any kind of exception
+ except Exception as ex:
+ for obj in objs:
+ delete_action = "delete_%s" % self._resource
+ obj_deleter = getattr(self._plugin, delete_action)
+ try:
+ obj_deleter(request.context, obj['id'])
+ except Exception:
+ # broad catch as our only purpose is to log the exception
+ LOG.exception("Unable to undo add for %s %s",
+ self._resource, obj['id'])
+ # TODO(salvatore-orlando): The object being processed when the
+ # plugin raised might have been created or not in the db.
+ # We need a way for ensuring that if it has been created,
+ # it is then deleted
+ raise
+
def create(self, request, body=None):
"""Creates a new instance of the requested entity"""
notifier_api.notify(request.context,
self._resource + '.create.start',
notifier_api.INFO,
body)
- body = self._prepare_request_body(request.context, body, True,
- allow_bulk=True)
+ body = self._prepare_request_body(request.context, body, True)
action = "create_%s" % self._resource
-
# Check authz
try:
if self._collection in body:
LOG.exception("Create operation not authorized")
raise webob.exc.HTTPForbidden()
- obj_creator = getattr(self._plugin, action)
- kwargs = {self._resource: body}
- obj = obj_creator(request.context, **kwargs)
- result = {self._resource: self._view(obj)}
- notifier_api.notify(request.context,
- self._publisher_id,
- self._resource + '.create.end',
- notifier_api.INFO,
- result)
- return result
+ def notify(create_result):
+ notifier_api.notify(request.context,
+ self._publisher_id,
+ self._resource + '.create.end',
+ notifier_api.INFO,
+ create_result)
+ return create_result
+
+ if self._collection in body and self._native_bulk:
+ # plugin does atomic bulk create operations
+ obj_creator = getattr(self._plugin, "%s_bulk" % action)
+ objs = obj_creator(request.context, body)
+ return notify({self._collection: [self._view(obj)
+ for obj in objs]})
+ else:
+ obj_creator = getattr(self._plugin, action)
+ if self._collection in body:
+ # Emulate atomic bulk behavior
+ objs = self._emulate_bulk_create(obj_creator, request, body)
+ return notify({self._collection: objs})
+ else:
+ kwargs = {self._resource: body}
+ obj = obj_creator(request.context, **kwargs)
+ return notify({self._resource: self._view(obj)})
def delete(self, request, id):
"""Deletes the specified entity"""
" that tenant_id is specified")
raise webob.exc.HTTPBadRequest(msg)
- def _prepare_request_body(self, context, body, is_create,
- allow_bulk=False):
+ def _prepare_request_body(self, context, body, is_create):
""" verifies required attributes are in request body, and that
an attribute is only specified if it is allowed for the given
operation (create/update).
raise webob.exc.HTTPBadRequest(_("Resource body required"))
body = body or {self._resource: {}}
- if self._collection in body and allow_bulk:
+ if self._collection in body and self._allow_bulk:
bulk_body = [self._prepare_request_body(context,
{self._resource: b},
is_create)
return {self._collection: bulk_body}
- elif self._collection in body and not allow_bulk:
+ elif self._collection in body and not self._allow_bulk:
raise webob.exc.HTTPBadRequest("Bulk operation not supported")
res_dict = body.get(self._resource)
})
-def create_resource(collection, resource, plugin, params):
- controller = Controller(plugin, collection, resource, params)
+def create_resource(collection, resource, plugin, params, allow_bulk=False):
+ controller = Controller(plugin, collection, resource, params, allow_bulk)
# NOTE(jkoelker) To anyone wishing to add "proper" xml support
# this is where you do it
certain events.
"""
+ # This attribute specifies whether the plugin supports or not
+ # bulk operations. Name mangling is used in order to ensure it
+ # is qualified by class
+ __native_bulk_support = True
+
def __init__(self):
# NOTE(jkoelker) This is an incomlete implementation. Subclasses
# must override __init__ and setup the database
"device_id": port["device_id"]}
return self._fields(res, fields)
+ def _create_bulk(self, resource, context, request_items):
+ objects = []
+ collection = "%ss" % resource
+ items = request_items[collection]
+ context.session.begin(subtransactions=True)
+ try:
+ for item in items:
+ obj_creator = getattr(self, 'create_%s' % resource)
+ objects.append(obj_creator(context, item))
+ context.session.commit()
+ except Exception:
+ LOG.exception("An exception occured while creating "
+ "the port:%s", item)
+ context.session.rollback()
+ raise
+ return objects
+
+ def create_network_bulk(self, context, networks):
+ return self._create_bulk('network', context, networks)
+
def create_network(self, context, network):
+ """ handle creation of a single network """
+ # single request processing
n = network['network']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, n)
- with context.session.begin():
+ with context.session.begin(subtransactions=True):
network = models_v2.Network(tenant_id=tenant_id,
id=n.get('id') or utils.str_uuid(),
name=n['name'],
filters=filters, fields=fields,
verbose=verbose)
+ def create_subnet_bulk(self, context, subnets):
+ return self._create_bulk('subnet', context, subnets)
+
def create_subnet(self, context, subnet):
s = subnet['subnet']
net = netaddr.IPNetwork(s['cidr'])
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
tenant_id = self._get_tenant_id_for_create(context, s)
- with context.session.begin():
+ with context.session.begin(subtransactions=True):
network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr'])
subnet = models_v2.Subnet(tenant_id=tenant_id,
filters=filters, fields=fields,
verbose=verbose)
+ def create_port_bulk(self, context, ports):
+ return self._create_bulk('port', context, ports)
+
def create_port(self, context, port):
p = port['port']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, p)
- with context.session.begin():
+ with context.session.begin(subtransactions=True):
network = self._get_network(context, p["network_id"])
# Ensure that a MAC address is defined and it is unique on the
# Update the allocated IP's
if ips:
- with context.session.begin():
+ with context.session.begin(subtransactions=True):
for ip in ips:
LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'],
port['network_id'], ip['subnet_id'], port.id)
# limitations under the License.
import contextlib
+import copy
import logging
import mock
import os
LOG = logging.getLogger(__name__)
+DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
'application/json': json_deserializer,
}
- plugin = test_config.get('plugin_name_v2',
- 'quantum.db.db_base_plugin_v2.'
- 'QuantumDbPluginV2')
- LOG.debug("db plugin test, the plugin is:%s", plugin)
+ plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
# Create the default configurations
args = ['--config-file', etcdir('quantum.conf.test')]
config.parse(args=args)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
self.api = APIRouter()
+ def _is_native_bulk_supported():
+ plugin_obj = QuantumManager.get_plugin()
+ native_bulk_attr_name = ("_%s__native_bulk_support"
+ % plugin_obj.__class__.__name__)
+ return getattr(plugin_obj, native_bulk_attr_name, False)
+
+ self._skip_native_bulk = not _is_native_bulk_supported()
+
def tearDown(self):
super(QuantumDbPluginV2TestCase, self).tearDown()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
data = self._deserializers[ctype].deserialize(response.body)['body']
return data
+ def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
+ """ Creates a bulk request for any kind of resource """
+ objects = []
+ collection = "%ss" % resource
+ for i in range(0, number):
+ obj = copy.deepcopy(data)
+ obj[resource]['name'] = "%s_%s" % (name, i)
+ if 'override' in kwargs and i in kwargs['override']:
+ obj[resource].update(kwargs['override'][i])
+ objects.append(obj)
+ req_data = {collection: objects}
+ req = self.new_create_request(collection, req_data, fmt)
+ if ('set_context' in kwargs and
+ kwargs['set_context'] is True and
+ 'tenant_id' in kwargs):
+ # create a specific auth context for this request
+ req.environ['quantum.context'] = context.Context(
+ '', kwargs['tenant_id'])
+ elif 'context' in kwargs:
+ req.environ['quantum.context'] = kwargs['context']
+ return req.get_response(self.api)
+
def _create_network(self, fmt, name, admin_status_up, **kwargs):
data = {'network': {'name': name,
'admin_state_up': admin_status_up,
return network_req.get_response(self.api)
+ def _create_network_bulk(self, fmt, number, name,
+ admin_status_up, **kwargs):
+ base_data = {'network': {'admin_state_up': admin_status_up,
+ 'tenant_id': self._tenant_id}}
+ return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
+
def _create_subnet(self, fmt, net_id, cidr,
expected_res_status=None, **kwargs):
data = {'subnet': {'network_id': net_id,
self.assertEqual(subnet_res.status_int, expected_res_status)
return subnet_res
+ def _create_subnet_bulk(self, fmt, number, net_id, name,
+ ip_version=4, **kwargs):
+ base_data = {'subnet': {'network_id': net_id,
+ 'ip_version': ip_version,
+ 'tenant_id': self._tenant_id}}
+ # auto-generate cidrs as they should not overlap
+ overrides = dict((k, v)
+ for (k, v) in zip(range(0, number),
+ [{'cidr': "10.0.%s.0/24" % num}
+ for num in range(0, number)]))
+ kwargs.update({'override': overrides})
+ return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
+
def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs):
content_type = 'application/' + fmt
data = {'port': {'network_id': net_id,
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
+ def _create_port_bulk(self, fmt, number, net_id, name,
+ admin_status_up, **kwargs):
+ base_data = {'port': {'network_id': net_id,
+ 'admin_state_up': admin_status_up,
+ 'tenant_id': self._tenant_id}}
+ return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
+
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True):
res = self._create_subnet(fmt,
req = self.new_delete_request(collection, id)
req.get_response(self.api)
+ def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
+ """ Invoked by test cases for injecting failures in plugin """
+ def second_call(*args, **kwargs):
+ raise Exception('boom')
+ patched_plugin.side_effect = second_call
+ return orig(*args, **kwargs)
+
+ def _validate_behavior_on_bulk_failure(self, res, collection):
+ self.assertEqual(res.status_int, 500)
+ req = self.new_list_request(collection)
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 200)
+ items = self.deserialize('json', res)
+ self.assertEqual(len(items[collection]), 0)
+
+ def _validate_behavior_on_bulk_success(self, res, collection,
+ names=['test_0', 'test_1']):
+ self.assertEqual(res.status_int, 201)
+ items = self.deserialize('json', res)[collection]
+ self.assertEqual(len(items), 2)
+ self.assertEqual(items[0]['name'], 'test_0')
+ self.assertEqual(items[1]['name'], 'test_1')
+
@contextlib.contextmanager
def network(self, name='net1',
admin_status_up=True,
self.assertEquals(port['port'][k], v)
self.assertTrue('mac_address' in port['port'])
+ def test_create_ports_bulk_native(self):
+ if self._skip_native_bulk:
+ self.skipTest("Plugin does not support native bulk port create")
+ with self.network() as net:
+ res = self._create_port_bulk('json', 2, net['network']['id'],
+ 'test', True)
+ self._validate_behavior_on_bulk_success(res, 'ports')
+
+ def test_create_ports_bulk_emulated(self):
+ real_has_attr = hasattr
+
+ #ensures the API choose the emulation code path
+ def fakehasattr(item, attr):
+ if attr.endswith('__native_bulk_support'):
+ return False
+ return real_has_attr(item, attr)
+
+ with mock.patch('__builtin__.hasattr',
+ new=fakehasattr):
+ with self.network() as net:
+ res = self._create_port_bulk('json', 2, net['network']['id'],
+ 'test', True)
+ self._validate_behavior_on_bulk_success(res, 'ports')
+
+ def test_create_ports_bulk_wrong_input(self):
+ with self.network() as net:
+ overrides = {1: {'admin_state_up': 'doh'}}
+ res = self._create_port_bulk('json', 2, net['network']['id'],
+ 'test', True,
+ override=overrides)
+ self.assertEqual(res.status_int, 400)
+ req = self.new_list_request('ports')
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 200)
+ ports = self.deserialize('json', res)
+ self.assertEqual(len(ports['ports']), 0)
+
+ def test_create_ports_bulk_emulated_plugin_failure(self):
+ real_has_attr = hasattr
+
+ #ensures the API choose the emulation code path
+ def fakehasattr(item, attr):
+ if attr.endswith('__native_bulk_support'):
+ return False
+ return real_has_attr(item, attr)
+
+ with mock.patch('__builtin__.hasattr',
+ new=fakehasattr):
+ orig = QuantumManager.get_plugin().create_port
+ with mock.patch.object(QuantumManager.get_plugin(),
+ 'create_port') as patched_plugin:
+
+ def side_effect(*args, **kwargs):
+ return self._do_side_effect(patched_plugin, orig,
+ *args, **kwargs)
+
+ patched_plugin.side_effect = side_effect
+ with self.network() as net:
+ res = self._create_port_bulk('json', 2,
+ net['network']['id'],
+ 'test',
+ True)
+ # We expect a 500 as we injected a fault in the plugin
+ self._validate_behavior_on_bulk_failure(res, 'ports')
+
+ def test_create_ports_bulk_native_plugin_failure(self):
+ if self._skip_native_bulk:
+ self.skipTest("Plugin does not support native bulk port create")
+ ctx = context.get_admin_context()
+ with self.network() as net:
+ orig = QuantumManager._instance.plugin.create_port
+ with mock.patch.object(QuantumManager._instance.plugin,
+ 'create_port') as patched_plugin:
+
+ def side_effect(*args, **kwargs):
+ return self._do_side_effect(patched_plugin, orig,
+ *args, **kwargs)
+
+ patched_plugin.side_effect = side_effect
+ res = self._create_port_bulk('json', 2, net['network']['id'],
+ 'test', True, context=ctx)
+ # We expect a 500 as we injected a fault in the plugin
+ self._validate_behavior_on_bulk_failure(res, 'ports')
+
def test_list_ports(self):
with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json')
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int, 409)
+ def test_create_networks_bulk_native(self):
+ if self._skip_native_bulk:
+ self.skipTest("Plugin does not support native bulk network create")
+ res = self._create_network_bulk('json', 2, 'test', True)
+ self._validate_behavior_on_bulk_success(res, 'networks')
+
+ def test_create_networks_bulk_emulated(self):
+ real_has_attr = hasattr
+
+ #ensures the API choose the emulation code path
+ def fakehasattr(item, attr):
+ if attr.endswith('__native_bulk_support'):
+ return False
+ return real_has_attr(item, attr)
+
+ with mock.patch('__builtin__.hasattr',
+ new=fakehasattr):
+ res = self._create_network_bulk('json', 2, 'test', True)
+ self._validate_behavior_on_bulk_success(res, 'networks')
+
+ def test_create_networks_bulk_wrong_input(self):
+ res = self._create_network_bulk('json', 2, 'test', True,
+ override={1:
+ {'admin_state_up': 'doh'}})
+ self.assertEqual(res.status_int, 400)
+ req = self.new_list_request('networks')
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 200)
+ nets = self.deserialize('json', res)
+ self.assertEqual(len(nets['networks']), 0)
+
+ def test_create_networks_bulk_emulated_plugin_failure(self):
+ real_has_attr = hasattr
+
+ def fakehasattr(item, attr):
+ if attr.endswith('__native_bulk_support'):
+ return False
+ return real_has_attr(item, attr)
+
+ orig = QuantumManager.get_plugin().create_network
+ #ensures the API choose the emulation code path
+ with mock.patch('__builtin__.hasattr',
+ new=fakehasattr):
+ with mock.patch.object(QuantumManager.get_plugin(),
+ 'create_network') as patched_plugin:
+
+ def side_effect(*args, **kwargs):
+ return self._do_side_effect(patched_plugin, orig,
+ *args, **kwargs)
+
+ patched_plugin.side_effect = side_effect
+ res = self._create_network_bulk('json', 2, 'test', True)
+ # We expect a 500 as we injected a fault in the plugin
+ self._validate_behavior_on_bulk_failure(res, 'networks')
+
+ def test_create_networks_bulk_native_plugin_failure(self):
+ if self._skip_native_bulk:
+ self.skipTest("Plugin does not support native bulk network create")
+ orig = QuantumManager.get_plugin().create_network
+ with mock.patch.object(QuantumManager.get_plugin(),
+ 'create_network') as patched_plugin:
+
+ def side_effect(*args, **kwargs):
+ return self._do_side_effect(patched_plugin, orig,
+ *args, **kwargs)
+
+ patched_plugin.side_effect = side_effect
+ res = self._create_network_bulk('json', 2, 'test', True)
+ # We expect a 500 as we injected a fault in the plugin
+ self._validate_behavior_on_bulk_failure(res, 'networks')
+
def test_list_networks(self):
with self.network(name='net1') as net1:
with self.network(name='net2') as net2:
pass
self.assertEquals(ctx_manager.exception.code, 400)
+ def test_create_subnets_bulk_native(self):
+ if self._skip_native_bulk:
+ self.skipTest("Plugin does not support native bulk subnet create")
+ with self.network() as net:
+ res = self._create_subnet_bulk('json', 2, net['network']['id'],
+ 'test')
+ self._validate_behavior_on_bulk_success(res, 'subnets')
+
+ def test_create_subnets_bulk_emulated(self):
+ real_has_attr = hasattr
+
+ #ensures the API choose the emulation code path
+ def fakehasattr(item, attr):
+ if attr.endswith('__native_bulk_support'):
+ return False
+ return real_has_attr(item, attr)
+
+ with mock.patch('__builtin__.hasattr',
+ new=fakehasattr):
+ with self.network() as net:
+ res = self._create_subnet_bulk('json', 2,
+ net['network']['id'],
+ 'test')
+ self._validate_behavior_on_bulk_success(res, 'subnets')
+
+ def test_create_subnets_bulk_emulated_plugin_failure(self):
+ real_has_attr = hasattr
+
+ #ensures the API choose the emulation code path
+ def fakehasattr(item, attr):
+ if attr.endswith('__native_bulk_support'):
+ return False
+ return real_has_attr(item, attr)
+
+ with mock.patch('__builtin__.hasattr',
+ new=fakehasattr):
+ orig = QuantumManager.get_plugin().create_subnet
+ with mock.patch.object(QuantumManager.get_plugin(),
+ 'create_subnet') as patched_plugin:
+
+ def side_effect(*args, **kwargs):
+ self._do_side_effect(patched_plugin, orig,
+ *args, **kwargs)
+
+ patched_plugin.side_effect = side_effect
+ with self.network() as net:
+ res = self._create_subnet_bulk('json', 2,
+ net['network']['id'],
+ 'test')
+ # We expect a 500 as we injected a fault in the plugin
+ self._validate_behavior_on_bulk_failure(res, 'subnets')
+
+ def test_create_subnets_bulk_native_plugin_failure(self):
+ if self._skip_native_bulk:
+ self.skipTest("Plugin does not support native bulk subnet create")
+ orig = QuantumManager._instance.plugin.create_subnet
+ with mock.patch.object(QuantumManager._instance.plugin,
+ 'create_subnet') as patched_plugin:
+ def side_effect(*args, **kwargs):
+ return self._do_side_effect(patched_plugin, orig,
+ *args, **kwargs)
+
+ patched_plugin.side_effect = side_effect
+ with self.network() as net:
+ res = self._create_subnet_bulk('json', 2,
+ net['network']['id'],
+ 'test')
+
+ # We expect a 500 as we injected a fault in the plugin
+ self._validate_behavior_on_bulk_failure(res, 'subnets')
+
def test_delete_subnet(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'