From 658452478fb35f3200e5d62b23d328be72d35039 Mon Sep 17 00:00:00 2001 From: Roman Podolyaka Date: Tue, 30 Apr 2013 15:12:15 +0300 Subject: [PATCH] Fix a race condition in add_tunnel_endpoint() If there are multiple OVS agents concurrently executing 'tunnel_sync' RPC call a race condition can occur leading to insertion of two different TunnelEndpoint entries having the same 'id' value. Unfortunately, we can not rely on: - @lockutils.synchronized(), because a Neutron installation can use more than one API node - with_lockmode('update'), because it works differently in PostgreSQL comparing to MySQL and doesn't guarantee that no new rows have been added to the table since the select query was issued. Please take a look at http://www.postgresql.org/files/developer/concurrency.pdf for more details. The proposed fix: - ensures there is a unique constraint set for 'id' column - wraps creation of a new TunnelEndpoint entry into a repeatedly executed transactional block (so even if a concurrent DB transaction has been flushed or commited earlier than this one we can handle an integrity error and try again, in spite of the specified transactions isolation level value) Fixes bug 1167916 Change-Id: I62dc729d595f090436199d5e1b6b98a884ead7a5 --- ...afba73813_ovs_tunnelendpoints_id_unique.py | 64 +++++++++++++++++++ neutron/plugins/openvswitch/ovs_db_v2.py | 44 +++++++++---- neutron/plugins/openvswitch/ovs_models_v2.py | 4 ++ neutron/tests/unit/openvswitch/test_ovs_db.py | 28 ++++++++ 4 files changed, 128 insertions(+), 12 deletions(-) create mode 100644 neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py diff --git a/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py b/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py new file mode 100644 index 000000000..6ab361d8a --- /dev/null +++ b/neutron/db/migration/alembic_migrations/versions/63afba73813_ovs_tunnelendpoints_id_unique.py @@ -0,0 +1,64 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Add unique constraint for id column of TunnelEndpoint + +Revision ID: 63afba73813 +Revises: 3c6e57a23db4 +Create Date: 2013-04-30 13:53:31.717450 + +""" + +# revision identifiers, used by Alembic. +revision = '63afba73813' +down_revision = '3c6e57a23db4' + +# Change to ['*'] if this migration applies to all plugins + +migration_for_plugins = [ + 'neutron.plugins.openvswitch.ovs_neutron_plugin.OVSNeutronPluginV2', +] + +from alembic import op + +from neutron.db import migration + + +CONSTRAINT_NAME = 'uniq_ovs_tunnel_endpoints0id' +TABLE_NAME = 'ovs_tunnel_endpoints' + + +def upgrade(active_plugins=None, options=None): + if not migration.should_run(active_plugins, migration_for_plugins): + return + + op.create_unique_constraint( + name=CONSTRAINT_NAME, + source=TABLE_NAME, + local_cols=['id'] + ) + + +def downgrade(active_plugins=None, options=None): + if not migration.should_run(active_plugins, migration_for_plugins): + return + + op.drop_constraint( + name=CONSTRAINT_NAME, + tablename=TABLE_NAME, + type='unique' + ) diff --git a/neutron/plugins/openvswitch/ovs_db_v2.py b/neutron/plugins/openvswitch/ovs_db_v2.py index 27b9032e4..4eb9015e2 100644 --- a/neutron/plugins/openvswitch/ovs_db_v2.py +++ b/neutron/plugins/openvswitch/ovs_db_v2.py @@ -16,8 +16,8 @@ # @author: Aaron Rosen, Nicira Networks, Inc. # @author: Bob Kukura, Red Hat, Inc. +from sqlalchemy import func from sqlalchemy.orm import exc -from sqlalchemy.sql import func from neutron.common import exceptions as q_exc import neutron.db.api as db @@ -25,6 +25,7 @@ from neutron.db import models_v2 from neutron.db import securitygroups_db as sg_db from neutron.extensions import securitygroup as ext_sg from neutron import manager +from neutron.openstack.common.db import exception as db_exc from neutron.openstack.common import log as logging from neutron.plugins.openvswitch.common import constants from neutron.plugins.openvswitch import ovs_models_v2 @@ -367,14 +368,33 @@ def _generate_tunnel_id(session): return max_tunnel_id + 1 -def add_tunnel_endpoint(ip): - session = db.get_session() - try: - tunnel = (session.query(ovs_models_v2.TunnelEndpoint). - filter_by(ip_address=ip).with_lockmode('update').one()) - except exc.NoResultFound: - tunnel_id = _generate_tunnel_id(session) - tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id) - session.add(tunnel) - session.flush() - return tunnel +def add_tunnel_endpoint(ip, max_retries=10): + """Return the endpoint of the given IP address or generate a new one.""" + + # NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a + # repeatedly executed transactional block to ensure it + # doesn't conflict with any other concurrently executed + # DB transactions in spite of the specified transactions + # isolation level value + for i in xrange(max_retries): + LOG.debug(_('Adding a tunnel endpoint for %s'), ip) + try: + session = db.get_session() + with session.begin(subtransactions=True): + tunnel = (session.query(ovs_models_v2.TunnelEndpoint). + filter_by(ip_address=ip).with_lockmode('update'). + first()) + + if tunnel is None: + tunnel_id = _generate_tunnel_id(session) + tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id) + session.add(tunnel) + + return tunnel + except db_exc.DBDuplicateEntry: + # a concurrent transaction has been commited, try again + LOG.debug(_('Adding a tunnel endpoint failed due to a concurrent' + 'transaction had been commited (%s attempts left)'), + max_retries - (i + 1)) + + raise q_exc.NeutronException(message='Unable to generate a new tunnel id') diff --git a/neutron/plugins/openvswitch/ovs_models_v2.py b/neutron/plugins/openvswitch/ovs_models_v2.py index 77a40a5ed..3ca34f1c2 100644 --- a/neutron/plugins/openvswitch/ovs_models_v2.py +++ b/neutron/plugins/openvswitch/ovs_models_v2.py @@ -18,6 +18,7 @@ from sqlalchemy import Boolean, Column, ForeignKey, Integer, String +from sqlalchemy.schema import UniqueConstraint from neutron.db.models_v2 import model_base @@ -86,6 +87,9 @@ class NetworkBinding(model_base.BASEV2): class TunnelEndpoint(model_base.BASEV2): """Represents tunnel endpoint in RPC mode.""" __tablename__ = 'ovs_tunnel_endpoints' + __table_args__ = ( + UniqueConstraint('id', name='uniq_ovs_tunnel_endpoints0id'), + ) ip_address = Column(String(64), primary_key=True) id = Column(Integer, nullable=False) diff --git a/neutron/tests/unit/openvswitch/test_ovs_db.py b/neutron/tests/unit/openvswitch/test_ovs_db.py index 8831cca07..ca8d4dc0d 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_db.py +++ b/neutron/tests/unit/openvswitch/test_ovs_db.py @@ -13,12 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock import testtools from testtools import matchers from neutron.common import exceptions as q_exc from neutron.db import api as db +from neutron.openstack.common.db import exception as db_exc +from neutron.openstack.common.db.sqlalchemy import session from neutron.plugins.openvswitch import ovs_db_v2 +from neutron.plugins.openvswitch import ovs_models_v2 as ovs_models from neutron.tests import base from neutron.tests.unit import test_db_plugin as test_plugin @@ -262,6 +266,30 @@ class TunnelAllocationsTest(base.BaseTestCase): ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES) self.assertIsNone(ovs_db_v2.get_tunnel_allocation(tunnel_id)) + def test_add_tunnel_endpoint_create_new_endpoint(self): + addr = '10.0.0.1' + ovs_db_v2.add_tunnel_endpoint(addr) + self.assertIsNotNone(self.session.query(ovs_models.TunnelEndpoint). + filter_by(ip_address=addr).first()) + + def test_add_tunnel_endpoint_retrieve_an_existing_endpoint(self): + addr = '10.0.0.1' + self.session.add(ovs_models.TunnelEndpoint(ip_address=addr, id=1)) + self.session.flush() + + tunnel = ovs_db_v2.add_tunnel_endpoint(addr) + self.assertEquals(tunnel.id, 1) + self.assertEquals(tunnel.ip_address, addr) + + def test_add_tunnel_endpoint_handle_duplicate_error(self): + with mock.patch.object(session.Session, 'query') as query_mock: + error = db_exc.DBDuplicateEntry(['id']) + query_mock.side_effect = error + + with testtools.ExpectedException(q_exc.NeutronException): + ovs_db_v2.add_tunnel_endpoint('10.0.0.1', 5) + self.assertEquals(query_mock.call_count, 5) + class NetworkBindingsTest(test_plugin.NeutronDbPluginV2TestCase): def setUp(self): -- 2.45.2