]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix a race condition in agents status update code
authorRoman Podoliaka <rpodolyaka@mirantis.com>
Wed, 27 Nov 2013 16:57:56 +0000 (18:57 +0200)
committerRoman Podoliaka <rpodolyaka@mirantis.com>
Mon, 9 Dec 2013 13:18:34 +0000 (15:18 +0200)
Code handling agents status updates coming via RPC checks,
if a corresponding entry for the given (agent_type, host)
pair already exists in DB and updates it. And if it doesn't
exist, a new entry is created.

Without a unique constraint this can cause a race condition
resulting in adding of two agent entries having the same value
of (agent_type, host) pair.

Note, that it's already not allowed to have multiple agents of
the same type having the same host value, but currently it's
enforced only at code level, not at DB schema level, which
effectively makes race conditions possible.

Closes-Bug: #1254246

Change-Id: I1ebaa111154b3d6b34074705b579097ab730594c

neutron/db/agents_db.py
neutron/db/migration/alembic_migrations/versions/1fcfc149aca4_agents_unique_by_type_and_host.py [new file with mode: 0644]
neutron/tests/unit/db/test_agent_db.py [new file with mode: 0644]

index e095a4c2a7467670e21265efd37385fc90420b6c..9a574a3f27df477c41b77870fff2fdbae423ac23 100644 (file)
@@ -25,6 +25,7 @@ from neutron.db import model_base
 from neutron.db import models_v2
 from neutron.extensions import agent as ext_agent
 from neutron import manager
+from neutron.openstack.common.db import exception as db_exc
 from neutron.openstack.common import jsonutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import timeutils
@@ -40,6 +41,11 @@ cfg.CONF.register_opt(
 class Agent(model_base.BASEV2, models_v2.HasId):
     """Represents agents running in neutron deployments."""
 
+    __table_args__ = (
+        sa.UniqueConstraint('agent_type', 'host',
+                            name='uniq_agents0agent_type0host'),
+    )
+
     # L3 agent, DHCP agent, OVS agent, LinuxBridge
     agent_type = sa.Column(sa.String(255), nullable=False)
     binary = sa.Column(sa.String(255), nullable=False)
@@ -135,8 +141,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
         agent = self._get_agent(context, id)
         return self._make_agent_dict(agent, fields)
 
-    def create_or_update_agent(self, context, agent):
-        """Create or update agent according to report."""
+    def _create_or_update_agent(self, context, agent):
         with context.session.begin(subtransactions=True):
             res_keys = ['agent_type', 'binary', 'host', 'topic']
             res = dict((k, agent[k]) for k in res_keys)
@@ -163,6 +168,28 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
                 context.session.add(agent_db)
             greenthread.sleep(0)
 
+    def create_or_update_agent(self, context, agent):
+        """Create or update agent according to report."""
+
+        try:
+            return self._create_or_update_agent(context, agent)
+        except db_exc.DBDuplicateEntry as e:
+            if e.columns == ['agent_type', 'host']:
+                # It might happen that two or more concurrent transactions are
+                # trying to insert new rows having the same value of
+                # (agent_type, host) pair at the same time (if there has been
+                # no such entry in the table and multiple agent status updates
+                # are being processed at the moment). In this case having a
+                # unique constraint on (agent_type, host) columns guarantees
+                # that only one transaction will succeed and insert a new agent
+                # entry, others will fail and be rolled back. That means we
+                # must retry them one more time: no INSERTs will be issued,
+                # because _get_agent_by_type_and_host() will return the
+                # existing agent entry, which will be updated multiple times
+                return self._create_or_update_agent(context, agent)
+
+            raise
+
 
 class AgentExtRpcCallback(object):
     """Processes the rpc report in plugin implementations."""
diff --git a/neutron/db/migration/alembic_migrations/versions/1fcfc149aca4_agents_unique_by_type_and_host.py b/neutron/db/migration/alembic_migrations/versions/1fcfc149aca4_agents_unique_by_type_and_host.py
new file mode 100644 (file)
index 0000000..12d5287
--- /dev/null
@@ -0,0 +1,62 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""Add a unique constraint on (agent_type, host) columns to prevent a race
+condition when an agent entry is 'upserted'.
+
+Revision ID: 1fcfc149aca4
+Revises: e197124d4b9
+Create Date: 2013-11-27 18:35:28.148680
+
+"""
+
+revision = '1fcfc149aca4'
+down_revision = 'e197124d4b9'
+
+migration_for_plugins = [
+    '*'
+]
+
+from alembic import op
+
+from neutron.db import migration
+
+
+TABLE_NAME = 'agents'
+UC_NAME = 'uniq_agents0agent_type0host'
+
+
+def upgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.create_unique_constraint(
+        name=UC_NAME,
+        source=TABLE_NAME,
+        local_cols=['agent_type', 'host']
+    )
+
+
+def downgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.drop_constraint(
+        name=UC_NAME,
+        table_name=TABLE_NAME,
+        type_='unique'
+    )
diff --git a/neutron/tests/unit/db/test_agent_db.py b/neutron/tests/unit/db/test_agent_db.py
new file mode 100644 (file)
index 0000000..e3dc5ee
--- /dev/null
@@ -0,0 +1,86 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright (c) 2013 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+
+from neutron import context
+from neutron.db import agents_db
+from neutron.db import api as db
+from neutron.db import db_base_plugin_v2 as base_plugin
+from neutron.openstack.common.db import exception as exc
+from neutron.tests import base
+
+
+class FakePlugin(base_plugin.NeutronDbPluginV2, agents_db.AgentDbMixin):
+    """A fake plugin class containing all DB methods."""
+
+
+class TestAgentsDbMixin(base.BaseTestCase):
+    def setUp(self):
+        super(TestAgentsDbMixin, self).setUp()
+
+        self.context = context.get_admin_context()
+        self.plugin = FakePlugin()
+        self.addCleanup(db.clear_db)
+
+        self.agent_status = {
+            'agent_type': 'Open vSwitch agent',
+            'binary': 'neutron-openvswitch-agent',
+            'host': 'overcloud-notcompute',
+            'topic': 'N/A'
+        }
+
+    def _assert_ref_fields_are_equal(self, reference, result):
+        """Compare (key, value) pairs of a reference dict with the result
+
+           Note: the result MAY have additional keys
+        """
+
+        for field, value in reference.items():
+            self.assertEqual(value, result[field], field)
+
+    def test_create_or_update_agent_new_entry(self):
+        self.plugin.create_or_update_agent(self.context, self.agent_status)
+
+        agent = self.plugin.get_agents(self.context)[0]
+        self._assert_ref_fields_are_equal(self.agent_status, agent)
+
+    def test_create_or_update_agent_existing_entry(self):
+        self.plugin.create_or_update_agent(self.context, self.agent_status)
+        self.plugin.create_or_update_agent(self.context, self.agent_status)
+        self.plugin.create_or_update_agent(self.context, self.agent_status)
+
+        agents = self.plugin.get_agents(self.context)
+        self.assertEqual(len(agents), 1)
+
+        agent = agents[0]
+        self._assert_ref_fields_are_equal(self.agent_status, agent)
+
+    def test_create_or_update_agent_concurrent_insert(self):
+        # NOTE(rpodolyaka): emulate violation of the unique constraint caused
+        #                   by a concurrent insert. Ensure we make another
+        #                   attempt on fail
+        with mock.patch('sqlalchemy.orm.Session.add') as add_mock:
+            add_mock.side_effect = [
+                exc.DBDuplicateEntry(columns=['agent_type', 'host']),
+                None
+            ]
+
+            self.plugin.create_or_update_agent(self.context, self.agent_status)
+
+            self.assertEqual(add_mock.call_count, 2,
+                             "Agent entry creation hasn't been retried")