Even though we have a duplicate IP check before creating an IP
allocation, there is a window between when that check is completed
and when the record is committed to the database. During this window,
another thread or server may use the same IP address, which will result
in the database commit failing with a DBDuplicateEntry exception.
This patch adds a decorator to convert the exception into a
RetryRequest, which will be caught at the API layer to start the
operation over, at which point the normal duplicate check will find
the IP and return the correct error to the user.
This was done instead of trying to convert the DBDuplicate to the
correct exception since it's on commit and there are possibly many
different sources of the duplicate record beyond the IP allocation
(depending on what mechanism drivers did during pre-commit, etc).
By retrying the request, we ensure that the prechecks run again that
will raise the appropriate exception.
Change-Id: I37a964497bf60a61bc49bdeec94a008f167c384f
Closes-Bug: #
1534447
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session
+from oslo_utils import excutils
from oslo_utils import uuidutils
from sqlalchemy import exc
)
+@contextlib.contextmanager
+def exc_to_retry(exceptions):
+ try:
+ yield
+ except Exception as e:
+ with excutils.save_and_reraise_exception() as ctx:
+ if isinstance(e, exceptions):
+ ctx.reraise = False
+ raise db_exc.RetryRequest(e)
+
+
def _create_facade_lazily():
global _FACADE
attrs['status'] = const.PORT_STATUS_DOWN
session = context.session
- with session.begin(subtransactions=True):
+ with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
+ session.begin(subtransactions=True):
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
result = super(Ml2Plugin, self).create_port(context, port)
self.extension_manager.process_create_port(context, attrs, result)
session = context.session
bound_mech_contexts = []
- with session.begin(subtransactions=True):
+ with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
+ session.begin(subtransactions=True):
port_db, binding = db.get_locked_port_and_binding(session, id)
if not port_db:
raise exc.PortNotFound(port_id=id)
--- /dev/null
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo_db import exception as db_exc
+import testtools
+
+from neutron.db import api as db_api
+from neutron.tests import base
+
+
+class TestExceptionToRetryContextManager(base.BaseTestCase):
+
+ def test_translates_single_exception(self):
+ with testtools.ExpectedException(db_exc.RetryRequest):
+ with db_api.exc_to_retry(ValueError):
+ raise ValueError()
+
+ def test_translates_multiple_exception_types(self):
+ with testtools.ExpectedException(db_exc.RetryRequest):
+ with db_api.exc_to_retry((ValueError, TypeError)):
+ raise TypeError()
+
+ def test_passes_other_exceptions(self):
+ with testtools.ExpectedException(ValueError):
+ with db_api.exc_to_retry(TypeError):
+ raise ValueError()
+
+ def test_inner_exception_preserved_in_retryrequest(self):
+ try:
+ exc = ValueError('test')
+ with db_api.exc_to_retry(ValueError):
+ raise exc
+ except db_exc.RetryRequest as e:
+ self.assertEqual(exc, e.inner_exc)
self.assertRaises(
exc.PortNotFound, plugin.get_port, ctx, port['port']['id'])
+ def test_port_create_resillient_to_duplicate_records(self):
+
+ def make_port():
+ with self.port():
+ pass
+
+ self._test_operation_resillient_to_ipallocation_failure(make_port)
+
+ def test_port_update_resillient_to_duplicate_records(self):
+ with self.port() as p:
+ data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}}
+ req = self.new_update_request('ports', data, p['port']['id'])
+
+ def do_request():
+ self.assertEqual(200, req.get_response(self.api).status_int)
+
+ self._test_operation_resillient_to_ipallocation_failure(do_request)
+
+ def _test_operation_resillient_to_ipallocation_failure(self, func):
+ from sqlalchemy import event
+
+ class IPAllocationsGrenade(object):
+ insert_ip_called = False
+ except_raised = False
+
+ def execute(self, con, curs, stmt, *args, **kwargs):
+ if 'INSERT INTO ipallocations' in stmt:
+ self.insert_ip_called = True
+
+ def commit(self, con):
+ # we blow up on commit to simulate another thread/server
+ # stealing our IP before our transaction was done
+ if self.insert_ip_called and not self.except_raised:
+ self.except_raised = True
+ raise db_exc.DBDuplicateEntry()
+
+ listener = IPAllocationsGrenade()
+ engine = db_api.get_engine()
+ event.listen(engine, 'before_cursor_execute', listener.execute)
+ event.listen(engine, 'commit', listener.commit)
+ self.addCleanup(event.remove, engine, 'before_cursor_execute',
+ listener.execute)
+ self.addCleanup(event.remove, engine, 'commit',
+ listener.commit)
+ func()
+ # make sure that the grenade went off during the commit
+ self.assertTrue(listener.except_raised)
+
class TestMl2PluginOnly(Ml2PluginV2TestCase):
"""For testing methods that don't call drivers"""