Summary:
Adds an optimistic locking strategy for the Big Switch
server manager so multiple Neutron servers wanting to
communicate with the backend do not receive the consistency
hash for use simultaneously.
The bsn-rest-call semaphore is removed because serialization
is now provided by the new locking scheme.
A new DB engine is added because the consistency hashes
need a life-cycle with rollbacks and other DB operations
than cannot impact or be impacted by database operations
happening on the regular Neutron objects.
Unit tests are included for each of the new branches
introduced.
Problem Statement:
Requests to the Big Switch controllers must contain the
consistency hash value received from the previous update.
Otherwise, an inconsistency error will be triggered which
will force a synchronization. Essentially, a new backend
call must be prevented from reading from the consistency
hash table in the DB until the previous call has updated
the table with the hash from the server response.
This can be addressed by a semaphore around the rest_call
function for the single server use case and by a table lock
on the consistency table for multiple Neutron servers.
However, both solutions are inadequate because a single
Neutron server does not scale and a table lock is not
supported by common SQL HA deployments (e.g. Galera).
This issue was previously addressed by deploying servers
in an active-standby configuration. However, that only
prevented the problem for HTTP API calls. All Neutron
servers would respond to RPC messages, some of which would
result in a port update and possible backend call which
would trigger a conflict if it happened at the same time
as a backend call from another server. These unnecessary
syncs are unsustainable as the topology increases beyond
~3k VMs.
Any solution needs to be back-portable to Icehouse so new
database tables, new requirements, etc. are all out of the
question.
Solution:
This patch stores the lock for the consistency hash as a part
of the DB record. The guaruntees the database offers around
atomic insertion and constrained atomic updates offer the
primitives necessary to ensure that only one process/thread
can lock the record at once.
The read_for_update method is modified to not return the hash
in the database until an identifier is inserted into the
current record or added as a new record. By using an UPDATE
query with a WHERE clause restricting to the current state,
only one of many concurrent callers to the DB will successfully
update the rows. If a caller sees that it didn't update any
rows, it will start the process over of trying to get the
lock.
If a caller observes that the same ID has the lock for
more than 60 seconds, it will assume the holder has
died and will attempt to take the lock. This is also done
in a concurrency-safe UPDATE call since there may be many
other callers may attempt to do the same thing. If it
fails and the lock was taken by someone else, the process
will start over.
Some pseudo-code resembling the logic:
read_current_lock
if no_record:
insert_lock
sleep_and_retry if constraint_violation else return
if current_is_locked and not timer_exceeded:
sleep_and_retry
if update_record_with_lock:
return
else:
sleep_and_retry
Closes-Bug: #
1374261
Change-Id: Ifa5a7c9749952bc2785a9bf3fed69ad55bf21acc
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import random
+import re
+import string
+import time
+
+from oslo.config import cfg
+from oslo.db import exception as db_exc
+from oslo.db.sqlalchemy import session
import sqlalchemy as sa
-from neutron.db import api as db
from neutron.db import model_base
+from neutron.openstack.common.gettextutils import _LI, _LW
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
+# Maximum time in seconds to wait for a single record lock to be released
+# NOTE: The total time waiting may exceed this if there are multiple servers
+# waiting for the same lock
+MAX_LOCK_WAIT_TIME = 15
+
+
+def setup_db():
+ '''Helper to register models for unit tests'''
+ if HashHandler._FACADE is None:
+ HashHandler._FACADE = session.EngineFacade.from_config(
+ cfg.CONF, sqlite_fk=True)
+ ConsistencyHash.metadata.create_all(
+ HashHandler._FACADE.get_engine())
+
+
+def clear_db():
+ '''Helper to unregister models and clear engine in unit tests'''
+ if not HashHandler._FACADE:
+ return
+ ConsistencyHash.metadata.drop_all(HashHandler._FACADE.get_engine())
+ HashHandler._FACADE = None
class ConsistencyHash(model_base.BASEV2):
'''
A wrapper object to keep track of the session between the read
and the update operations.
+
+ This class needs an SQL engine completely independent of the main
+ neutron connection so rollbacks from consistency hash operations don't
+ affect the parent sessions.
'''
- def __init__(self, context=None, hash_id='1'):
+ _FACADE = None
+
+ def __init__(self, hash_id='1'):
+ if HashHandler._FACADE is None:
+ HashHandler._FACADE = session.EngineFacade.from_config(
+ cfg.CONF, sqlite_fk=True)
self.hash_id = hash_id
- self.session = db.get_session() if not context else context.session
- self.hash_db_obj = None
+ self.session = HashHandler._FACADE.get_session(autocommit=True,
+ expire_on_commit=False)
+ self.random_lock_id = ''.join(random.choice(string.ascii_uppercase
+ + string.digits)
+ for _ in range(10))
+ self.lock_marker = 'LOCKED_BY[%s]' % self.random_lock_id
+
+ def _get_current_record(self):
+ with self.session.begin(subtransactions=True):
+ res = (self.session.query(ConsistencyHash).
+ filter_by(hash_id=self.hash_id).first())
+ if res:
+ self.session.refresh(res) # make sure latest is loaded from db
+ return res
+
+ def _insert_empty_hash_with_lock(self):
+ # try to insert a new hash, return False on conflict
+ try:
+ with self.session.begin(subtransactions=True):
+ res = ConsistencyHash(hash_id=self.hash_id,
+ hash=self.lock_marker)
+ self.session.add(res)
+ return True
+ except db_exc.DBDuplicateEntry:
+ # another server created a new record at the same time
+ return False
+
+ def _optimistic_update_hash_record(self, old_record, new_hash):
+ # Optimistic update strategy. Returns True if successful, else False.
+ query = sa.update(ConsistencyHash.__table__).values(hash=new_hash)
+ query = query.where(ConsistencyHash.hash_id == old_record.hash_id)
+ query = query.where(ConsistencyHash.hash == old_record.hash)
+ with self._FACADE.get_engine().begin() as conn:
+ result = conn.execute(query)
+ # We need to check update row count in case another server is
+ # doing this at the same time. Only one will succeed, the other will
+ # not update any rows.
+ return result.rowcount != 0
+
+ def _get_lock_owner(self, record):
+ matches = re.findall("^LOCKED_BY\[(\w+)\]", record)
+ if not matches:
+ return None
+ return matches[0]
def read_for_update(self):
- # REVISIT(kevinbenton): locking here with the DB is prone to deadlocks
- # in various multi-REST-call scenarios (router intfs, flips, etc).
- # Since it doesn't work in Galera deployments anyway, another sync
- # mechanism will have to be introduced to prevent inefficient double
- # syncs in HA deployments.
+ # An optimistic locking strategy with a timeout to avoid using a
+ # consistency hash while another server is using it. This will
+ # not return until a lock is acquired either normally or by stealing
+ # it after an individual ID holds it for greater than
+ # MAX_LOCK_WAIT_TIME.
+ lock_wait_start = None
+ last_lock_owner = None
+ while True:
+ res = self._get_current_record()
+ if not res:
+ # no current entry. try to insert to grab lock
+ if not self._insert_empty_hash_with_lock():
+ # A failed insert after missing current record means
+ # a concurrent insert occured. Start process over to
+ # find the new record.
+ LOG.debug("Concurrent record inserted. Retrying.")
+ time.sleep(0.25)
+ continue
+ # The empty hash was successfully inserted with our lock
+ return ''
+
+ current_lock_owner = self._get_lock_owner(res.hash)
+ if not current_lock_owner:
+ # no current lock. attempt to lock
+ new = self.lock_marker + res.hash
+ if not self._optimistic_update_hash_record(res, new):
+ # someone else beat us to it. restart process to wait
+ # for new lock ID to be removed
+ LOG.debug(
+ "Failed to acquire lock. Restarting lock wait. "
+ "Previous hash: %(prev)s. Attempted update: %(new)s" %
+ {'prev': res.hash, 'new': new})
+ time.sleep(0.25)
+ continue
+ # successfully got the lock
+ return res.hash
+
+ LOG.debug("This request's lock ID is %(this)s. "
+ "DB lock held by %(that)s" %
+ {'this': self.random_lock_id,
+ 'that': current_lock_owner})
+
+ if current_lock_owner == self.random_lock_id:
+ # no change needed, we already have the table lock due to
+ # previous read_for_update call.
+ # return hash with lock tag stripped off for use in a header
+ return res.hash.replace(self.lock_marker, '')
+
+ if current_lock_owner != last_lock_owner:
+ # The owner changed since the last iteration, but it
+ # wasn't to us. Reset the counter. Log if not
+ # first iteration.
+ if lock_wait_start:
+ LOG.debug("Lock owner changed from %(old)s to %(new)s "
+ "while waiting to acquire it.",
+ {'old': last_lock_owner,
+ 'new': current_lock_owner})
+ lock_wait_start = time.time()
+ last_lock_owner = current_lock_owner
+ if time.time() - lock_wait_start > MAX_LOCK_WAIT_TIME:
+ # the lock has been held too long, steal it
+ LOG.warning(_LW("Gave up waiting for consistency DB "
+ "lock, trying to take it. "
+ "Current hash is: %s"), res.hash)
+ new_db_value = res.hash.replace(current_lock_owner,
+ self.random_lock_id)
+ if self._optimistic_update_hash_record(res, new_db_value):
+ return res.hash.replace(new_db_value, '')
+ LOG.info(_LI("Failed to take lock. Another process updated "
+ "the DB first."))
+
+ def clear_lock(self):
+ LOG.debug("Clearing hash record lock of id %s" % self.random_lock_id)
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
- if not res:
- return ''
- self.hash_db_obj = res
- return res.hash
+ if not res:
+ LOG.warning(_LW("Hash record already gone, no lock to clear."))
+ return
+ if not res.hash.startswith(self.lock_marker):
+ # if these are frequent the server is too slow
+ LOG.warning(_LW("Another server already removed the lock. %s"),
+ res.hash)
+ return
+ res.hash = res.hash.replace(self.lock_marker, '')
def put_hash(self, hash):
hash = hash or ''
with self.session.begin(subtransactions=True):
- if self.hash_db_obj is not None:
- self.hash_db_obj.hash = hash
+ res = (self.session.query(ConsistencyHash).
+ filter_by(hash_id=self.hash_id).first())
+ if res:
+ res.hash = hash
else:
conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
self.session.merge(conhash)
from oslo.serialization import jsonutils
from neutron.common import exceptions
-from neutron.common import utils
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch.db import consistency_db as cdb
# don't clear hash from DB if a hash header wasn't present
if hash_value is not None:
hash_handler.put_hash(hash_value)
+ else:
+ hash_handler.clear_lock()
try:
respdata = jsonutils.loads(respstr)
except ValueError:
# response was not JSON, ignore the exception
pass
+ else:
+ # release lock so others don't have to wait for timeout
+ hash_handler.clear_lock()
+
ret = (response.status, response.reason, respstr, respdata)
except httplib.HTTPException:
# If we were using a cached connection, try again with a new one.
"""
return resp[0] in SUCCESS_CODES
- @utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=False):
context = self.get_context_ref()
# backend controller
cdict.pop('auth_token', None)
headers[REQ_CONTEXT_HEADER] = jsonutils.dumps(cdict)
- hash_handler = cdb.HashHandler(context=context)
+ hash_handler = cdb.HashHandler()
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:
self.setup_config_files()
self.setup_patches()
super(BigSwitchDhcpAgentNotifierTestCase, self).setUp()
+ self.setup_db()
self.startHttpPatch()
import neutron.common.test_lib as test_lib
from neutron.plugins.bigswitch import config
+from neutron.plugins.bigswitch.db import consistency_db
from neutron.tests.unit.bigswitch import fake_server
test_lib.test_config['config_files'] = [os.path.join(etc_path,
'restproxy.ini.test')]
self.addCleanup(cfg.CONF.reset)
+ self.addCleanup(consistency_db.clear_db)
config.register_config()
# Only try SSL on SSL tests
cfg.CONF.set_override('server_ssl', False, 'RESTPROXY')
self.httpPatch = mock.patch(HTTPCON,
new=fake_server.HTTPConnectionMock)
self.httpPatch.start()
+
+ def setup_db(self):
+ # setup the db engine and models for the consistency db
+ consistency_db.setup_db()
service_plugins = {'L3_ROUTER_NAT': self._l3_plugin_name}
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name, service_plugins=service_plugins)
+ self.setup_db()
self.port_create_status = 'BUILD'
self.startHttpPatch()
self.setup_config_files()
super(test_extradhcp.ExtraDhcpOptDBTestCase,
self).setUp(plugin=self._plugin_name)
+ self.setup_db()
self.startHttpPatch()
super(RouterDBTestBase, self).setUp(plugin=self._plugin_name,
ext_mgr=ext_mgr,
service_plugins=service_plugins)
+ self.setup_db()
cfg.CONF.set_default('allow_overlapping_ips', False)
self.plugin_obj = manager.NeutronManager.get_service_plugins().get(
'L3_ROUTER_NAT')
self.setup_patches()
self._attribute_map_bk_ = {}
super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
+ self.setup_db()
plugin = manager.NeutronManager.get_plugin()
self.notifier = plugin.notifier
self.rpc = plugin.endpoints[0]
import mock
from oslo.config import cfg
+from oslo.db import exception as db_exc
from oslo.serialization import jsonutils
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
-from neutron.plugins.bigswitch.db import consistency_db as cdb
+from neutron.plugins.bigswitch.db import consistency_db
from neutron.plugins.bigswitch import servermanager
from neutron.tests.unit.bigswitch import test_restproxy_plugin as test_rp
def test_delete_failure_sets_bad_hash(self):
pl = manager.NeutronManager.get_plugin()
- hash_handler = cdb.HashHandler()
+ hash_handler = consistency_db.HashHandler()
with mock.patch(
SERVERMANAGER + '.ServerProxy.rest_call',
return_value=(httplib.INTERNAL_SERVER_ERROR, 0, 0, 0)
con = self.sm.HTTPSConnectionWithValidation('127.0.0.1', 0, timeout=1)
# if httpcon was created, a connect attempt should raise a socket error
self.assertRaises(socket.error, con.connect)
+
+
+class HashLockingTests(test_rp.BigSwitchProxyPluginV2TestCase):
+
+ def _get_hash_from_handler_db(self, handler):
+ with handler.session.begin(subtransactions=True):
+ res = (handler.session.query(consistency_db.ConsistencyHash).
+ filter_by(hash_id=handler.hash_id).first())
+ return res.hash
+
+ def test_hash_handle_lock_no_initial_record(self):
+ handler = consistency_db.HashHandler()
+ h1 = handler.read_for_update()
+ # return to caller should be empty even with lock in DB
+ self.assertFalse(h1)
+ # db should have a lock marker
+ self.assertEqual(handler.lock_marker,
+ self._get_hash_from_handler_db(handler))
+ # an entry should clear the lock
+ handler.put_hash('DIGEST')
+ self.assertEqual('DIGEST', self._get_hash_from_handler_db(handler))
+
+ def test_hash_handle_lock_existing_record(self):
+ handler = consistency_db.HashHandler()
+ handler.put_hash('DIGEST') # set initial hash
+
+ h1 = handler.read_for_update()
+ self.assertEqual('DIGEST', h1)
+ self.assertEqual(handler.lock_marker + 'DIGEST',
+ self._get_hash_from_handler_db(handler))
+
+ # make sure update works
+ handler.put_hash('DIGEST2')
+ self.assertEqual('DIGEST2', self._get_hash_from_handler_db(handler))
+
+ def test_db_duplicate_on_insert(self):
+ handler = consistency_db.HashHandler()
+ with mock.patch.object(
+ handler.session, 'add', side_effect=[db_exc.DBDuplicateEntry, '']
+ ) as add_mock:
+ handler.read_for_update()
+ # duplicate insert failure should result in retry
+ self.assertEqual(2, add_mock.call_count)
+
+ def test_update_hit_no_records(self):
+ handler = consistency_db.HashHandler()
+ # set initial hash so update will be required
+ handler.put_hash('DIGEST')
+ with mock.patch.object(handler._FACADE, 'get_engine') as ge:
+ conn = ge.return_value.begin.return_value.__enter__.return_value
+ firstresult = mock.Mock()
+ # a rowcount of 0 simulates the effect of another db client
+ # updating the same record the handler was trying to update
+ firstresult.rowcount = 0
+ secondresult = mock.Mock()
+ secondresult.rowcount = 1
+ conn.execute.side_effect = [firstresult, secondresult]
+ handler.read_for_update()
+ # update should have been called again after the failure
+ self.assertEqual(2, conn.execute.call_count)
+
+ def test_handler_already_holding_lock(self):
+ handler = consistency_db.HashHandler()
+ handler.read_for_update() # lock the table
+ with mock.patch.object(handler._FACADE, 'get_engine') as ge:
+ handler.read_for_update()
+ # get engine should not have been called because no update
+ # should have been made
+ self.assertFalse(ge.called)
+
+ def test_clear_lock(self):
+ handler = consistency_db.HashHandler()
+ handler.put_hash('SOMEHASH')
+ handler.read_for_update() # lock the table
+ self.assertEqual(handler.lock_marker + 'SOMEHASH',
+ self._get_hash_from_handler_db(handler))
+ handler.clear_lock()
+ self.assertEqual('SOMEHASH',
+ self._get_hash_from_handler_db(handler))
+
+ def test_clear_lock_skip_after_steal(self):
+ handler1 = consistency_db.HashHandler()
+ handler1.read_for_update() # lock the table
+ handler2 = consistency_db.HashHandler()
+ with mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME', new=0):
+ handler2.read_for_update()
+ before = self._get_hash_from_handler_db(handler1)
+ # handler1 should not clear handler2's lock
+ handler1.clear_lock()
+ self.assertEqual(before, self._get_hash_from_handler_db(handler1))
+
+ def test_take_lock_from_other(self):
+ handler1 = consistency_db.HashHandler()
+ handler1.read_for_update() # lock the table
+ handler2 = consistency_db.HashHandler()
+ with mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME') as mlock:
+ # make handler2 wait for only one iteration
+ mlock.__lt__.side_effect = [False, True]
+ handler2.read_for_update()
+ # once MAX LOCK exceeded, comparisons should stop due to lock steal
+ self.assertEqual(2, mlock.__lt__.call_count)
+ dbentry = self._get_hash_from_handler_db(handler1)
+ # handler2 should have the lock
+ self.assertIn(handler2.lock_marker, dbentry)
+ self.assertNotIn(handler1.lock_marker, dbentry)
+ # lock protection only blocks read_for_update, anyone can change
+ handler1.put_hash('H1')
+
+ def test_failure_to_steal_lock(self):
+ handler1 = consistency_db.HashHandler()
+ handler1.read_for_update() # lock the table
+ handler2 = consistency_db.HashHandler()
+ with contextlib.nested(
+ mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME'),
+ mock.patch.object(handler2, '_optimistic_update_hash_record',
+ side_effect=[False, True])
+ ) as (mlock, oplock):
+ # handler2 will go through 2 iterations since the lock will fail on
+ # the first attempt
+ mlock.__lt__.side_effect = [False, True, False, True]
+ handler2.read_for_update()
+ self.assertEqual(4, mlock.__lt__.call_count)
+ self.assertEqual(2, oplock.call_count)
def setUp(self):
super(test_ssl_certificate_base, self).setUp(self.plugin_str)
+ self.setup_db()
class TestSslSticky(test_ssl_certificate_base):