raise e.errors[0].error
raise exc.ServicePortInUse(port_id=port_id, reason=e)
+ @oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
+ retry_on_deadlock=True)
def delete_port(self, context, id, l3_port_check=True):
self._pre_delete_port(context, id, l3_port_check)
# TODO(armax): get rid of the l3 dependency in the with block
# by the called method
self.assertIsNone(l3plugin.disassociate_floatingips(ctx, port_id))
+ def test_delete_port_tolerates_db_deadlock(self):
+ ctx = context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+ with self.port() as port:
+ port_db, binding = ml2_db.get_locked_port_and_binding(
+ ctx.session, port['port']['id'])
+ with mock.patch('neutron.plugins.ml2.plugin.'
+ 'db.get_locked_port_and_binding') as lock:
+ lock.side_effect = [db_exc.DBDeadlock,
+ (port_db, binding)]
+ plugin.delete_port(ctx, port['port']['id'])
+ self.assertEqual(2, lock.call_count)
+ self.assertRaises(
+ exc.PortNotFound, plugin.get_port, ctx, port['port']['id'])
+
class TestMl2PluginOnly(Ml2PluginV2TestCase):
"""For testing methods that don't call drivers"""