{'res': resource,
'id': obj['result']['id']})
- @oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
- retry_on_request=True)
def _create_bulk_ml2(self, resource, context, request_items):
objects = []
collection = "%ss" % resource
return result, mech_context
- @oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
- retry_on_request=True)
- def _create_network_with_retries(self, context, network):
- return self._create_network_db(context, network)
-
def create_network(self, context, network):
- result, mech_context = self._create_network_with_retries(context,
- network)
+ result, mech_context = self._create_network_db(context, network)
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
self.mechanism_manager.update_subnet_postcommit(mech_context)
return updated_subnet
- @oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
- retry_on_request=True)
def delete_subnet(self, context, id):
# REVISIT(rkukura) The super(Ml2Plugin, self).delete_subnet()
# function is not used because it deallocates the subnet's addresses
return result, mech_context
- @oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
- retry_on_request=True)
def create_port(self, context, port):
attrs = port[attributes.PORT]
result, mech_context = self._create_port_db(context, port)
def test_create_network_segment_allocation_fails(self):
plugin = manager.NeutronManager.get_plugin()
- with mock.patch.object(plugin.type_manager, 'create_network_segments',
- side_effect=db_exc.RetryRequest(ValueError())) as f:
- self.assertRaises(ValueError,
- plugin.create_network,
- context.get_admin_context(),
- {'network': {'tenant_id': 'sometenant',
- 'name': 'dummy',
- 'admin_state_up': True,
- 'shared': False}})
+ with mock.patch.object(
+ plugin.type_manager, 'create_network_segments',
+ side_effect=db_exc.RetryRequest(ValueError())
+ ) as f:
+ data = {'network': {'tenant_id': 'sometenant', 'name': 'dummy',
+ 'admin_state_up': True, 'shared': False}}
+ self.new_create_request('networks', data).get_response(self.api)
self.assertEqual(db_api.MAX_RETRIES + 1, f.call_count)