]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Big Switch: Lock consistency table for REST calls
authorKevin Benton <blak111@gmail.com>
Fri, 6 Jun 2014 21:15:46 +0000 (14:15 -0700)
committerKevin Benton <blak111@gmail.com>
Wed, 18 Jun 2014 15:09:42 +0000 (08:09 -0700)
Adds a new class to hold an SQL table lock throughout
a REST call to prevent servers in an HA deployment from
using a stale consistency hash value.

Also passes the current context down to the server manager
for every request since the consistency hash will need to
be read from the database instead of from a variable on the
server pool object.

Closes-Bug: #1328331
Change-Id: I5f8402c076d7732742c0ae4d9b9d6833d42a0b7b

neutron/plugins/bigswitch/db/consistency_db.py
neutron/plugins/bigswitch/plugin.py
neutron/plugins/bigswitch/servermanager.py
neutron/tests/unit/bigswitch/test_servermanager.py

index cd89a26906f4abb264b6db034cca2046e7d2b644..4d1a1db792431c2873c65e42f558f417a6124f3e 100644 (file)
 #    under the License.
 import sqlalchemy as sa
 
+from neutron.common import exceptions
 from neutron.db import api as db
 from neutron.db import model_base
 from neutron.openstack.common import log as logging
 
 LOG = logging.getLogger(__name__)
 
-'''
-A simple table to store the latest consistency hash
-received from a server in case neutron gets restarted.
-'''
+
+class MultipleReadForUpdateCalls(exceptions.NeutronException):
+    message = _("Only one read_for_update call may be made at a time.")
 
 
 class ConsistencyHash(model_base.BASEV2):
     '''
+    A simple table to store the latest consistency hash
+    received from a server.
     For now we only support one global state so the
     hash_id will always be '1'
     '''
@@ -37,20 +39,44 @@ class ConsistencyHash(model_base.BASEV2):
     hash = sa.Column(sa.String(255), nullable=False)
 
 
-def get_consistency_hash(hash_id='1'):
-    session = db.get_session()
-    with session.begin(subtransactions=True):
-        query = session.query(ConsistencyHash)
-        res = query.filter_by(hash_id=hash_id).first()
-    if not res:
-        return False
-    return res.hash
+class HashHandler(object):
+    '''
+    A wrapper object to keep track of the session and hold the SQL
+    lock between the read and the update to prevent other servers
+    from reading the hash during a transaction.
+    '''
+    def __init__(self, context=None, hash_id='1'):
+        self.hash_id = hash_id
+        self.session = db.get_session() if not context else context.session
+        self.hash_db_obj = None
+        self.transaction = None
 
+    def read_for_update(self):
+        if self.transaction:
+            raise MultipleReadForUpdateCalls()
+        self.transaction = self.session.begin(subtransactions=True)
+        # Lock for update here to prevent another server from reading the hash
+        # while this one is in the middle of a transaction.
+        # This may not lock the SQL table in MySQL Galera deployments
+        # but that's okay because the worst case is a double-sync
+        res = (self.session.query(ConsistencyHash).
+               filter_by(hash_id=self.hash_id).
+               with_lockmode('update').first())
+        if not res:
+            return ''
+        self.hash_db_obj = res
+        return res.hash
 
-def put_consistency_hash(hash, hash_id='1'):
-    session = db.get_session()
-    with session.begin(subtransactions=True):
-        conhash = ConsistencyHash(hash_id=hash_id, hash=hash)
-        session.merge(conhash)
+    def put_hash(self, hash):
+        hash = hash or ''
+        if not self.transaction:
+            self.transaction = self.session.begin(subtransactions=True)
+        if self.hash_db_obj is not None:
+            self.hash_db_obj.hash = hash
+        else:
+            conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
+            self.session.merge(conhash)
+        self.transaction.commit()
+        self.transaction = None
         LOG.debug(_("Consistency hash for group %(hash_id)s updated "
-                    "to %(hash)s"), {'hash_id': hash_id, 'hash': hash})
+                    "to %(hash)s"), {'hash_id': self.hash_id, 'hash': hash})
index 9249f5d6b0215d65ea15e47fe5ae7e37c3395721..44ea5e4fad5fb50809856ca0b593b5a188eb1679 100644 (file)
@@ -45,6 +45,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
 """
 
 import copy
+import functools
 import httplib
 import re
 
@@ -448,6 +449,14 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
             raise exceptions.PortNotFound(port_id=port_id)
 
 
+def put_context_in_serverpool(f):
+    @functools.wraps(f)
+    def wrapper(self, context, *args, **kwargs):
+        self.servers.set_context(context)
+        return f(self, context, *args, **kwargs)
+    return wrapper
+
+
 class NeutronRestProxyV2(NeutronRestProxyV2Base,
                          addr_pair_db.AllowedAddressPairsMixin,
                          extradhcpopt_db.ExtraDhcpOptMixin,
@@ -514,6 +523,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
 
+    @put_context_in_serverpool
     def create_network(self, context, network):
         """Create a network.
 
@@ -557,6 +567,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
         # return created network
         return new_net
 
+    @put_context_in_serverpool
     def update_network(self, context, net_id, network):
         """Updates the properties of a particular Virtual Network.
 
@@ -596,6 +607,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
 
     # NOTE(kevinbenton): workaround for eventlet/mysql deadlock
     @utils.synchronized('bsn-port-barrier')
+    @put_context_in_serverpool
     def delete_network(self, context, net_id):
         """Delete a network.
         :param context: neutron api request context
@@ -618,6 +630,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
             self._send_delete_network(orig_net, context)
             return ret_val
 
+    @put_context_in_serverpool
     def create_port(self, context, port):
         """Create a port, which is a connection point of a device
         (e.g., a VM NIC) to attach to a L2 Neutron network.
@@ -708,6 +721,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
                 self._extend_port_dict_binding(context, port)
         return [self._fields(port, fields) for port in ports]
 
+    @put_context_in_serverpool
     def update_port(self, context, port_id, port):
         """Update values of a port.
 
@@ -784,6 +798,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
 
     # NOTE(kevinbenton): workaround for eventlet/mysql deadlock
     @utils.synchronized('bsn-port-barrier')
+    @put_context_in_serverpool
     def delete_port(self, context, port_id, l3_port_check=True):
         """Delete a port.
         :param context: neutron api request context
@@ -809,6 +824,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
             self._delete_port(context, port_id)
             self.servers.rest_delete_port(tenid, port['network_id'], port_id)
 
+    @put_context_in_serverpool
     def create_subnet(self, context, subnet):
         LOG.debug(_("NeutronRestProxyV2: create_subnet() called"))
 
@@ -825,6 +841,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
             self._send_update_network(orig_net, context)
         return new_subnet
 
+    @put_context_in_serverpool
     def update_subnet(self, context, id, subnet):
         LOG.debug(_("NeutronRestProxyV2: update_subnet() called"))
 
@@ -843,6 +860,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
 
     # NOTE(kevinbenton): workaround for eventlet/mysql deadlock
     @utils.synchronized('bsn-port-barrier')
+    @put_context_in_serverpool
     def delete_subnet(self, context, id):
         LOG.debug(_("NeutronRestProxyV2: delete_subnet() called"))
         orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id)
@@ -881,6 +899,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
             return tenantset
         return defaultset
 
+    @put_context_in_serverpool
     def create_router(self, context, router):
         LOG.debug(_("NeutronRestProxyV2: create_router() called"))
 
@@ -902,6 +921,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
             # return created router
             return new_router
 
+    @put_context_in_serverpool
     def update_router(self, context, router_id, router):
 
         LOG.debug(_("NeutronRestProxyV2.update_router() called"))
@@ -925,6 +945,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
     # NOTE(kevinbenton): workaround for eventlet/mysql deadlock.
     # delete_router ends up calling _delete_port instead of delete_port.
     @utils.synchronized('bsn-port-barrier')
+    @put_context_in_serverpool
     def delete_router(self, context, router_id):
         LOG.debug(_("NeutronRestProxyV2: delete_router() called"))
 
@@ -1015,6 +1036,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
                                                       interface_id)
             return del_ret
 
+    @put_context_in_serverpool
     def create_floatingip(self, context, floatingip):
         LOG.debug(_("NeutronRestProxyV2: create_floatingip() called"))
 
@@ -1038,6 +1060,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
             # return created floating IP
             return new_fl_ip
 
+    @put_context_in_serverpool
     def update_floatingip(self, context, id, floatingip):
         LOG.debug(_("NeutronRestProxyV2: update_floatingip() called"))
 
@@ -1054,6 +1077,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
                 self._send_floatingip_update(context)
             return new_fl_ip
 
+    @put_context_in_serverpool
     def delete_floatingip(self, context, id):
         LOG.debug(_("NeutronRestProxyV2: delete_floatingip() called"))
 
@@ -1078,6 +1102,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
     # overriding method from l3_db as original method calls
     # self.delete_floatingip() which in turn calls self.delete_port() which
     # is locked with 'bsn-port-barrier'
+    @put_context_in_serverpool
     def delete_disassociated_floatingips(self, context, network_id):
         query = self._model_query(context, l3_db.FloatingIP)
         query = query.filter_by(floating_network_id=network_id,
index c39c4e9f09518a1d25017b51de07f5669147e702..8ee64778f51a0b0f41440f07cafab87b58932a9f 100644 (file)
@@ -39,6 +39,7 @@ import socket
 import ssl
 
 import eventlet
+import eventlet.corolocal
 from oslo.config import cfg
 
 from neutron.common import exceptions
@@ -121,7 +122,7 @@ class ServerProxy(object):
         return self.capabilities
 
     def rest_call(self, action, resource, data='', headers={}, timeout=False,
-                  reconnect=False):
+                  reconnect=False, hash_handler=None):
         uri = self.base_uri + resource
         body = json.dumps(data)
         if not headers:
@@ -131,7 +132,12 @@ class ServerProxy(object):
         headers['NeutronProxy-Agent'] = self.name
         headers['Instance-ID'] = self.neutron_id
         headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
-        headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash or ''
+        if hash_handler:
+            # this will be excluded on calls that don't need hashes
+            # (e.g. topology sync, capability checks)
+            headers[HASH_MATCH_HEADER] = hash_handler.read_for_update()
+        else:
+            hash_handler = cdb.HashHandler()
         if 'keep-alive' in self.capabilities:
             headers['Connection'] = 'keep-alive'
         else:
@@ -178,9 +184,7 @@ class ServerProxy(object):
         try:
             self.currentconn.request(action, uri, body, headers)
             response = self.currentconn.getresponse()
-            newhash = response.getheader(HASH_MATCH_HEADER)
-            if newhash:
-                self._put_consistency_hash(newhash)
+            hash_handler.put_hash(response.getheader(HASH_MATCH_HEADER))
             respstr = response.read()
             respdata = respstr
             if response.status in self.success_codes:
@@ -216,10 +220,6 @@ class ServerProxy(object):
                                                     'data': ret[3]})
         return ret
 
-    def _put_consistency_hash(self, newhash):
-        self.mypool.consistency_hash = newhash
-        cdb.put_consistency_hash(newhash)
-
 
 class ServerPool(object):
 
@@ -235,6 +235,7 @@ class ServerPool(object):
         self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
         self.base_uri = base_uri
         self.name = name
+        self.contexts = {}
         self.timeout = cfg.CONF.RESTPROXY.server_timeout
         self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
         default_port = 8000
@@ -246,10 +247,6 @@ class ServerPool(object):
         self.get_topo_function = None
         self.get_topo_function_args = {}
 
-        # Hash to send to backend with request as expected previous
-        # state to verify consistency.
-        self.consistency_hash = cdb.get_consistency_hash()
-
         if not servers:
             raise cfg.Error(_('Servers not defined. Aborting server manager.'))
         servers = [s if len(s.rsplit(':', 1)) == 2
@@ -268,6 +265,19 @@ class ServerPool(object):
                        cfg.CONF.RESTPROXY.consistency_interval)
         LOG.debug(_("ServerPool: initialization done"))
 
+    def set_context(self, context):
+        # this context needs to be local to the greenthread
+        # so concurrent requests don't use the wrong context
+        self.contexts[eventlet.corolocal.get_ident()] = context
+
+    def pop_context(self):
+        # Don't store these contexts after use. They should only
+        # last for one request.
+        try:
+            return self.contexts.pop(eventlet.corolocal.get_ident())
+        except KeyError:
+            return None
+
     def get_capabilities(self):
         # lookup on first try
         try:
@@ -394,12 +404,14 @@ class ServerPool(object):
     @utils.synchronized('bsn-rest-call')
     def rest_call(self, action, resource, data, headers, ignore_codes,
                   timeout=False):
+        hash_handler = cdb.HashHandler(context=self.pop_context())
         good_first = sorted(self.servers, key=lambda x: x.failed)
         first_response = None
         for active_server in good_first:
             ret = active_server.rest_call(action, resource, data, headers,
                                           timeout,
-                                          reconnect=self.always_reconnect)
+                                          reconnect=self.always_reconnect,
+                                          hash_handler=hash_handler)
             # If inconsistent, do a full synchronization
             if ret[0] == httplib.CONFLICT:
                 if not self.get_topo_function:
index 3a54e315b8c234e27b9ab417b81e4260e1df82f2..a6883d36bb0ef276e7a84a729978d6b474762a8e 100644 (file)
@@ -352,7 +352,8 @@ class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase):
             # making a call should trigger a conflict sync
             pl.servers.rest_call('GET', '/', '', None, [])
             srestmock.assert_has_calls([
-                mock.call('GET', '/', '', None, False, reconnect=True),
+                mock.call('GET', '/', '', None, False, reconnect=True,
+                          hash_handler=mock.ANY),
                 mock.call('PUT', '/topology',
                           {'routers': [], 'networks': []},
                           timeout=None)