If there are multiple OVS agents concurrently executing
'tunnel_sync' RPC call a race condition can occur
leading to insertion of two different TunnelEndpoint
entries having the same 'id' value.
Unfortunately, we can not rely on:
- @lockutils.synchronized(), because a Neutron installation can use
more than one API node
- with_lockmode('update'), because it works differently in PostgreSQL
comparing to MySQL and doesn't guarantee that no new rows have been
added to the table since the select query was issued. Please take a
look at http://www.postgresql.org/files/developer/concurrency.pdf for
more details.
The proposed fix:
- ensures there is a unique constraint set for 'id' column
- wraps creation of a new TunnelEndpoint entry into a
repeatedly executed transactional block (so even if a concurrent
DB transaction has been flushed or commited earlier than this one
we can handle an integrity error and try again, in spite of the
specified transactions isolation level value)
Fixes bug
1167916
Change-Id: I62dc729d595f090436199d5e1b6b98a884ead7a5
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Add unique constraint for id column of TunnelEndpoint
+
+Revision ID: 63afba73813
+Revises: 3c6e57a23db4
+Create Date: 2013-04-30 13:53:31.717450
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '63afba73813'
+down_revision = '3c6e57a23db4'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+ 'neutron.plugins.openvswitch.ovs_neutron_plugin.OVSNeutronPluginV2',
+]
+
+from alembic import op
+
+from neutron.db import migration
+
+
+CONSTRAINT_NAME = 'uniq_ovs_tunnel_endpoints0id'
+TABLE_NAME = 'ovs_tunnel_endpoints'
+
+
+def upgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.create_unique_constraint(
+ name=CONSTRAINT_NAME,
+ source=TABLE_NAME,
+ local_cols=['id']
+ )
+
+
+def downgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.drop_constraint(
+ name=CONSTRAINT_NAME,
+ tablename=TABLE_NAME,
+ type='unique'
+ )
# @author: Aaron Rosen, Nicira Networks, Inc.
# @author: Bob Kukura, Red Hat, Inc.
+from sqlalchemy import func
from sqlalchemy.orm import exc
-from sqlalchemy.sql import func
from neutron.common import exceptions as q_exc
import neutron.db.api as db
from neutron.db import securitygroups_db as sg_db
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
+from neutron.openstack.common.db import exception as db_exc
from neutron.openstack.common import log as logging
from neutron.plugins.openvswitch.common import constants
from neutron.plugins.openvswitch import ovs_models_v2
return max_tunnel_id + 1
-def add_tunnel_endpoint(ip):
- session = db.get_session()
- try:
- tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
- filter_by(ip_address=ip).with_lockmode('update').one())
- except exc.NoResultFound:
- tunnel_id = _generate_tunnel_id(session)
- tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
- session.add(tunnel)
- session.flush()
- return tunnel
+def add_tunnel_endpoint(ip, max_retries=10):
+ """Return the endpoint of the given IP address or generate a new one."""
+
+ # NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a
+ # repeatedly executed transactional block to ensure it
+ # doesn't conflict with any other concurrently executed
+ # DB transactions in spite of the specified transactions
+ # isolation level value
+ for i in xrange(max_retries):
+ LOG.debug(_('Adding a tunnel endpoint for %s'), ip)
+ try:
+ session = db.get_session()
+ with session.begin(subtransactions=True):
+ tunnel = (session.query(ovs_models_v2.TunnelEndpoint).
+ filter_by(ip_address=ip).with_lockmode('update').
+ first())
+
+ if tunnel is None:
+ tunnel_id = _generate_tunnel_id(session)
+ tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
+ session.add(tunnel)
+
+ return tunnel
+ except db_exc.DBDuplicateEntry:
+ # a concurrent transaction has been commited, try again
+ LOG.debug(_('Adding a tunnel endpoint failed due to a concurrent'
+ 'transaction had been commited (%s attempts left)'),
+ max_retries - (i + 1))
+
+ raise q_exc.NeutronException(message='Unable to generate a new tunnel id')
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+from sqlalchemy.schema import UniqueConstraint
from neutron.db.models_v2 import model_base
class TunnelEndpoint(model_base.BASEV2):
"""Represents tunnel endpoint in RPC mode."""
__tablename__ = 'ovs_tunnel_endpoints'
+ __table_args__ = (
+ UniqueConstraint('id', name='uniq_ovs_tunnel_endpoints0id'),
+ )
ip_address = Column(String(64), primary_key=True)
id = Column(Integer, nullable=False)
# See the License for the specific language governing permissions and
# limitations under the License.
+import mock
import testtools
from testtools import matchers
from neutron.common import exceptions as q_exc
from neutron.db import api as db
+from neutron.openstack.common.db import exception as db_exc
+from neutron.openstack.common.db.sqlalchemy import session
from neutron.plugins.openvswitch import ovs_db_v2
+from neutron.plugins.openvswitch import ovs_models_v2 as ovs_models
from neutron.tests import base
from neutron.tests.unit import test_db_plugin as test_plugin
ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)
self.assertIsNone(ovs_db_v2.get_tunnel_allocation(tunnel_id))
+ def test_add_tunnel_endpoint_create_new_endpoint(self):
+ addr = '10.0.0.1'
+ ovs_db_v2.add_tunnel_endpoint(addr)
+ self.assertIsNotNone(self.session.query(ovs_models.TunnelEndpoint).
+ filter_by(ip_address=addr).first())
+
+ def test_add_tunnel_endpoint_retrieve_an_existing_endpoint(self):
+ addr = '10.0.0.1'
+ self.session.add(ovs_models.TunnelEndpoint(ip_address=addr, id=1))
+ self.session.flush()
+
+ tunnel = ovs_db_v2.add_tunnel_endpoint(addr)
+ self.assertEquals(tunnel.id, 1)
+ self.assertEquals(tunnel.ip_address, addr)
+
+ def test_add_tunnel_endpoint_handle_duplicate_error(self):
+ with mock.patch.object(session.Session, 'query') as query_mock:
+ error = db_exc.DBDuplicateEntry(['id'])
+ query_mock.side_effect = error
+
+ with testtools.ExpectedException(q_exc.NeutronException):
+ ovs_db_v2.add_tunnel_endpoint('10.0.0.1', 5)
+ self.assertEquals(query_mock.call_count, 5)
+
class NetworkBindingsTest(test_plugin.NeutronDbPluginV2TestCase):
def setUp(self):