# under the License.
import sqlalchemy as sa
+from neutron.common import exceptions
from neutron.db import api as db
from neutron.db import model_base
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
-'''
-A simple table to store the latest consistency hash
-received from a server in case neutron gets restarted.
-'''
+
+class MultipleReadForUpdateCalls(exceptions.NeutronException):
+ message = _("Only one read_for_update call may be made at a time.")
class ConsistencyHash(model_base.BASEV2):
'''
+ A simple table to store the latest consistency hash
+ received from a server.
For now we only support one global state so the
hash_id will always be '1'
'''
hash = sa.Column(sa.String(255), nullable=False)
-def get_consistency_hash(hash_id='1'):
- session = db.get_session()
- with session.begin(subtransactions=True):
- query = session.query(ConsistencyHash)
- res = query.filter_by(hash_id=hash_id).first()
- if not res:
- return False
- return res.hash
+class HashHandler(object):
+ '''
+ A wrapper object to keep track of the session and hold the SQL
+ lock between the read and the update to prevent other servers
+ from reading the hash during a transaction.
+ '''
+ def __init__(self, context=None, hash_id='1'):
+ self.hash_id = hash_id
+ self.session = db.get_session() if not context else context.session
+ self.hash_db_obj = None
+ self.transaction = None
+ def read_for_update(self):
+ if self.transaction:
+ raise MultipleReadForUpdateCalls()
+ self.transaction = self.session.begin(subtransactions=True)
+ # Lock for update here to prevent another server from reading the hash
+ # while this one is in the middle of a transaction.
+ # This may not lock the SQL table in MySQL Galera deployments
+ # but that's okay because the worst case is a double-sync
+ res = (self.session.query(ConsistencyHash).
+ filter_by(hash_id=self.hash_id).
+ with_lockmode('update').first())
+ if not res:
+ return ''
+ self.hash_db_obj = res
+ return res.hash
-def put_consistency_hash(hash, hash_id='1'):
- session = db.get_session()
- with session.begin(subtransactions=True):
- conhash = ConsistencyHash(hash_id=hash_id, hash=hash)
- session.merge(conhash)
+ def put_hash(self, hash):
+ hash = hash or ''
+ if not self.transaction:
+ self.transaction = self.session.begin(subtransactions=True)
+ if self.hash_db_obj is not None:
+ self.hash_db_obj.hash = hash
+ else:
+ conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
+ self.session.merge(conhash)
+ self.transaction.commit()
+ self.transaction = None
LOG.debug(_("Consistency hash for group %(hash_id)s updated "
- "to %(hash)s"), {'hash_id': hash_id, 'hash': hash})
+ "to %(hash)s"), {'hash_id': self.hash_id, 'hash': hash})
"""
import copy
+import functools
import httplib
import re
raise exceptions.PortNotFound(port_id=port_id)
+def put_context_in_serverpool(f):
+ @functools.wraps(f)
+ def wrapper(self, context, *args, **kwargs):
+ self.servers.set_context(context)
+ return f(self, context, *args, **kwargs)
+ return wrapper
+
+
class NeutronRestProxyV2(NeutronRestProxyV2Base,
addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
# Consume from all consumers in a thread
self.conn.consume_in_thread()
+ @put_context_in_serverpool
def create_network(self, context, network):
"""Create a network.
# return created network
return new_net
+ @put_context_in_serverpool
def update_network(self, context, net_id, network):
"""Updates the properties of a particular Virtual Network.
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier')
+ @put_context_in_serverpool
def delete_network(self, context, net_id):
"""Delete a network.
:param context: neutron api request context
self._send_delete_network(orig_net, context)
return ret_val
+ @put_context_in_serverpool
def create_port(self, context, port):
"""Create a port, which is a connection point of a device
(e.g., a VM NIC) to attach to a L2 Neutron network.
self._extend_port_dict_binding(context, port)
return [self._fields(port, fields) for port in ports]
+ @put_context_in_serverpool
def update_port(self, context, port_id, port):
"""Update values of a port.
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier')
+ @put_context_in_serverpool
def delete_port(self, context, port_id, l3_port_check=True):
"""Delete a port.
:param context: neutron api request context
self._delete_port(context, port_id)
self.servers.rest_delete_port(tenid, port['network_id'], port_id)
+ @put_context_in_serverpool
def create_subnet(self, context, subnet):
LOG.debug(_("NeutronRestProxyV2: create_subnet() called"))
self._send_update_network(orig_net, context)
return new_subnet
+ @put_context_in_serverpool
def update_subnet(self, context, id, subnet):
LOG.debug(_("NeutronRestProxyV2: update_subnet() called"))
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier')
+ @put_context_in_serverpool
def delete_subnet(self, context, id):
LOG.debug(_("NeutronRestProxyV2: delete_subnet() called"))
orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id)
return tenantset
return defaultset
+ @put_context_in_serverpool
def create_router(self, context, router):
LOG.debug(_("NeutronRestProxyV2: create_router() called"))
# return created router
return new_router
+ @put_context_in_serverpool
def update_router(self, context, router_id, router):
LOG.debug(_("NeutronRestProxyV2.update_router() called"))
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock.
# delete_router ends up calling _delete_port instead of delete_port.
@utils.synchronized('bsn-port-barrier')
+ @put_context_in_serverpool
def delete_router(self, context, router_id):
LOG.debug(_("NeutronRestProxyV2: delete_router() called"))
interface_id)
return del_ret
+ @put_context_in_serverpool
def create_floatingip(self, context, floatingip):
LOG.debug(_("NeutronRestProxyV2: create_floatingip() called"))
# return created floating IP
return new_fl_ip
+ @put_context_in_serverpool
def update_floatingip(self, context, id, floatingip):
LOG.debug(_("NeutronRestProxyV2: update_floatingip() called"))
self._send_floatingip_update(context)
return new_fl_ip
+ @put_context_in_serverpool
def delete_floatingip(self, context, id):
LOG.debug(_("NeutronRestProxyV2: delete_floatingip() called"))
# overriding method from l3_db as original method calls
# self.delete_floatingip() which in turn calls self.delete_port() which
# is locked with 'bsn-port-barrier'
+ @put_context_in_serverpool
def delete_disassociated_floatingips(self, context, network_id):
query = self._model_query(context, l3_db.FloatingIP)
query = query.filter_by(floating_network_id=network_id,
import ssl
import eventlet
+import eventlet.corolocal
from oslo.config import cfg
from neutron.common import exceptions
return self.capabilities
def rest_call(self, action, resource, data='', headers={}, timeout=False,
- reconnect=False):
+ reconnect=False, hash_handler=None):
uri = self.base_uri + resource
body = json.dumps(data)
if not headers:
headers['NeutronProxy-Agent'] = self.name
headers['Instance-ID'] = self.neutron_id
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
- headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash or ''
+ if hash_handler:
+ # this will be excluded on calls that don't need hashes
+ # (e.g. topology sync, capability checks)
+ headers[HASH_MATCH_HEADER] = hash_handler.read_for_update()
+ else:
+ hash_handler = cdb.HashHandler()
if 'keep-alive' in self.capabilities:
headers['Connection'] = 'keep-alive'
else:
try:
self.currentconn.request(action, uri, body, headers)
response = self.currentconn.getresponse()
- newhash = response.getheader(HASH_MATCH_HEADER)
- if newhash:
- self._put_consistency_hash(newhash)
+ hash_handler.put_hash(response.getheader(HASH_MATCH_HEADER))
respstr = response.read()
respdata = respstr
if response.status in self.success_codes:
'data': ret[3]})
return ret
- def _put_consistency_hash(self, newhash):
- self.mypool.consistency_hash = newhash
- cdb.put_consistency_hash(newhash)
-
class ServerPool(object):
self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
self.base_uri = base_uri
self.name = name
+ self.contexts = {}
self.timeout = cfg.CONF.RESTPROXY.server_timeout
self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
default_port = 8000
self.get_topo_function = None
self.get_topo_function_args = {}
- # Hash to send to backend with request as expected previous
- # state to verify consistency.
- self.consistency_hash = cdb.get_consistency_hash()
-
if not servers:
raise cfg.Error(_('Servers not defined. Aborting server manager.'))
servers = [s if len(s.rsplit(':', 1)) == 2
cfg.CONF.RESTPROXY.consistency_interval)
LOG.debug(_("ServerPool: initialization done"))
+ def set_context(self, context):
+ # this context needs to be local to the greenthread
+ # so concurrent requests don't use the wrong context
+ self.contexts[eventlet.corolocal.get_ident()] = context
+
+ def pop_context(self):
+ # Don't store these contexts after use. They should only
+ # last for one request.
+ try:
+ return self.contexts.pop(eventlet.corolocal.get_ident())
+ except KeyError:
+ return None
+
def get_capabilities(self):
# lookup on first try
try:
@utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=False):
+ hash_handler = cdb.HashHandler(context=self.pop_context())
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:
ret = active_server.rest_call(action, resource, data, headers,
timeout,
- reconnect=self.always_reconnect)
+ reconnect=self.always_reconnect,
+ hash_handler=hash_handler)
# If inconsistent, do a full synchronization
if ret[0] == httplib.CONFLICT:
if not self.get_topo_function:
# making a call should trigger a conflict sync
pl.servers.rest_call('GET', '/', '', None, [])
srestmock.assert_has_calls([
- mock.call('GET', '/', '', None, False, reconnect=True),
+ mock.call('GET', '/', '', None, False, reconnect=True,
+ hash_handler=mock.ANY),
mock.call('PUT', '/topology',
{'routers': [], 'networks': []},
timeout=None)