]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix a race condition in add_tunnel_endpoint()
authorRoman Podolyaka <rpodolyaka@mirantis.com>
Tue, 30 Apr 2013 12:12:15 +0000 (15:12 +0300)
committerMark McClain <mark.mcclain@dreamhost.com>
Thu, 29 Aug 2013 04:37:58 +0000 (00:37 -0400)
If there are multiple OVS agents concurrently executing
'tunnel_sync' RPC call a race condition can occur
leading to insertion of two different TunnelEndpoint
entries having the same 'id' value.

Unfortunately, we can not rely on:
  - @lockutils.synchronized(), because a Neutron installation can use
    more than one API node
  - with_lockmode('update'), because it works differently in PostgreSQL
    comparing to MySQL and doesn't guarantee that no new rows have been
    added to the table since the select query was issued. Please take a
    look at http://www.postgresql.org/files/developer/concurrency.pdf for
    more details.

The proposed fix:
  - ensures there is a unique constraint set for 'id' column
  - wraps creation of a new TunnelEndpoint entry into a
    repeatedly executed transactional block (so even if a concurrent
    DB transaction has been flushed or commited earlier than this one
    we can handle an integrity error and try again, in spite of the
    specified transactions isolation level value)

Fixes bug 1167916

Change-Id: I62dc729d595f090436199d5e1b6b98a884ead7a5

neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py [new file with mode: 0644]
neutron/plugins/openvswitch/ovs_db_v2.py
neutron/plugins/openvswitch/ovs_models_v2.py
neutron/tests/unit/openvswitch/test_ovs_db.py

diff --git a/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py b/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py
new file mode 100644 (file)
index 0000000..6ab361d
--- /dev/null
@@ -0,0 +1,64 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""Add unique constraint for id column of TunnelEndpoint
+
+Revision ID: 63afba73813
+Revises: 3c6e57a23db4
+Create Date: 2013-04-30 13:53:31.717450
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '63afba73813'
+down_revision = '3c6e57a23db4'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+    'neutron.plugins.openvswitch.ovs_neutron_plugin.OVSNeutronPluginV2',
+]
+
+from alembic import op
+
+from neutron.db import migration
+
+
+CONSTRAINT_NAME = 'uniq_ovs_tunnel_endpoints0id'
+TABLE_NAME = 'ovs_tunnel_endpoints'
+
+
+def upgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.create_unique_constraint(
+        name=CONSTRAINT_NAME,
+        source=TABLE_NAME,
+        local_cols=['id']
+    )
+
+
+def downgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.drop_constraint(
+        name=CONSTRAINT_NAME,
+        tablename=TABLE_NAME,
+        type='unique'
+    )
index 27b9032e40b4e9be915f7cefd7241596852d0425..4eb9015e2187522f6b284c386d4f4d55497e3714 100644 (file)
@@ -16,8 +16,8 @@
 # @author: Aaron Rosen, Nicira Networks, Inc.
 # @author: Bob Kukura, Red Hat, Inc.
 
+from sqlalchemy import func
 from sqlalchemy.orm import exc
-from sqlalchemy.sql import func
 
 from neutron.common import exceptions as q_exc
 import neutron.db.api as db
@@ -25,6 +25,7 @@ from neutron.db import models_v2
 from neutron.db import securitygroups_db as sg_db
 from neutron.extensions import securitygroup as ext_sg
 from neutron import manager
+from neutron.openstack.common.db import exception as db_exc
 from neutron.openstack.common import log as logging
 from neutron.plugins.openvswitch.common import constants
 from neutron.plugins.openvswitch import ovs_models_v2
@@ -367,14 +368,33 @@ def _generate_tunnel_id(session):
     return max_tunnel_id + 1
 
 
-def add_tunnel_endpoint(ip):
-    session = db.get_session()
-    try:
-        tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
-                  filter_by(ip_address=ip).with_lockmode('update').one())
-    except exc.NoResultFound:
-        tunnel_id = _generate_tunnel_id(session)
-        tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
-        session.add(tunnel)
-        session.flush()
-    return tunnel
+def add_tunnel_endpoint(ip, max_retries=10):
+    """Return the endpoint of the given IP address or generate a new one."""
+
+    # NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a
+    #                   repeatedly executed transactional block to ensure it
+    #                   doesn't conflict with any other concurrently executed
+    #                   DB transactions in spite of the specified transactions
+    #                   isolation level value
+    for i in xrange(max_retries):
+        LOG.debug(_('Adding a tunnel endpoint for %s'), ip)
+        try:
+            session = db.get_session()
+            with session.begin(subtransactions=True):
+                tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
+                          filter_by(ip_address=ip).with_lockmode('update').
+                          first())
+
+                if tunnel is None:
+                    tunnel_id = _generate_tunnel_id(session)
+                    tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
+                    session.add(tunnel)
+
+                return tunnel
+        except db_exc.DBDuplicateEntry:
+            # a concurrent transaction has been commited, try again
+            LOG.debug(_('Adding a tunnel endpoint failed due to a concurrent'
+                        'transaction had been commited (%s attempts left)'),
+                      max_retries - (i + 1))
+
+    raise q_exc.NeutronException(message='Unable to generate a new tunnel id')
index 77a40a5ed488efe64f77917488c9f19f3f1cdb37..3ca34f1c2040f728b83effc8072a014a8d0936a7 100644 (file)
@@ -18,6 +18,7 @@
 
 
 from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+from sqlalchemy.schema import UniqueConstraint
 
 from neutron.db.models_v2 import model_base
 
@@ -86,6 +87,9 @@ class NetworkBinding(model_base.BASEV2):
 class TunnelEndpoint(model_base.BASEV2):
     """Represents tunnel endpoint in RPC mode."""
     __tablename__ = 'ovs_tunnel_endpoints'
+    __table_args__ = (
+        UniqueConstraint('id', name='uniq_ovs_tunnel_endpoints0id'),
+    )
 
     ip_address = Column(String(64), primary_key=True)
     id = Column(Integer, nullable=False)
index 8831cca072f0d34a0b6374fc59d2e7087c104e06..ca8d4dc0d7398893f47619ecdc4a1c38a171d571 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import mock
 import testtools
 from testtools import matchers
 
 from neutron.common import exceptions as q_exc
 from neutron.db import api as db
+from neutron.openstack.common.db import exception as db_exc
+from neutron.openstack.common.db.sqlalchemy import session
 from neutron.plugins.openvswitch import ovs_db_v2
+from neutron.plugins.openvswitch import ovs_models_v2 as ovs_models
 from neutron.tests import base
 from neutron.tests.unit import test_db_plugin as test_plugin
 
@@ -262,6 +266,30 @@ class TunnelAllocationsTest(base.BaseTestCase):
         ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)
         self.assertIsNone(ovs_db_v2.get_tunnel_allocation(tunnel_id))
 
+    def test_add_tunnel_endpoint_create_new_endpoint(self):
+        addr = '10.0.0.1'
+        ovs_db_v2.add_tunnel_endpoint(addr)
+        self.assertIsNotNone(self.session.query(ovs_models.TunnelEndpoint).
+                             filter_by(ip_address=addr).first())
+
+    def test_add_tunnel_endpoint_retrieve_an_existing_endpoint(self):
+        addr = '10.0.0.1'
+        self.session.add(ovs_models.TunnelEndpoint(ip_address=addr, id=1))
+        self.session.flush()
+
+        tunnel = ovs_db_v2.add_tunnel_endpoint(addr)
+        self.assertEquals(tunnel.id, 1)
+        self.assertEquals(tunnel.ip_address, addr)
+
+    def test_add_tunnel_endpoint_handle_duplicate_error(self):
+        with mock.patch.object(session.Session, 'query') as query_mock:
+            error = db_exc.DBDuplicateEntry(['id'])
+            query_mock.side_effect = error
+
+            with testtools.ExpectedException(q_exc.NeutronException):
+                ovs_db_v2.add_tunnel_endpoint('10.0.0.1', 5)
+            self.assertEquals(query_mock.call_count, 5)
+
 
 class NetworkBindingsTest(test_plugin.NeutronDbPluginV2TestCase):
     def setUp(self):