# server_auth : <username:password> (default: no auth)
# server_ssl : True | False (default: False)
# sync_data : True | False (default: False)
-# server_timeout : <int> (default: 10 seconds)
+# auto_sync_on_failure : True | False (default: True)
+# server_timeout : <integer> (default: 10 seconds)
# neutron_id : <string> (default: neutron-<hostname>)
# add_meta_server_route : True | False (default: True)
# thread_pool_size : <int> (default: 4)
# Sync data on connect
# sync_data=False
+# If neutron fails to create a resource because the backend controller
+# doesn't know of a dependency, automatically trigger a full data
+# synchronization to the controller.
+# auto_sync_on_failure=True
+
# Maximum number of seconds to wait for proxy request to connect and complete.
# server_timeout=10
--- /dev/null
+# Copyright 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""bsn_consistencyhashes
+
+Revision ID: 81c553f3776c
+Revises: 24c7ea5160d7
+Create Date: 2014-02-26 18:56:00.402855
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '81c553f3776c'
+down_revision = '24c7ea5160d7'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+ 'neutron.plugins.bigswitch.plugin.NeutronRestProxyV2'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from neutron.db import migration
+
+
+def upgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.create_table(
+ 'consistencyhashes',
+ sa.Column('hash_id', sa.String(255), primary_key=True),
+ sa.Column('hash', sa.String(255), nullable=False)
+ )
+
+
+def downgrade(active_plugins=None, options=None):
+ if not migration.should_run(active_plugins, migration_for_plugins):
+ return
+
+ op.drop_table('consistencyhashes')
"Floodlight controller.")),
cfg.BoolOpt('sync_data', default=False,
help=_("Sync data on connect")),
+ cfg.BoolOpt('auto_sync_on_failure', default=True,
+ help=_("If neutron fails to create a resource because "
+ "the backend controller doesn't know of a dependency, "
+ "automatically trigger a full data synchronization "
+ "to the controller.")),
+ cfg.IntOpt('consistency_interval', default=60,
+ help=_("Time between verifications that the backend controller "
+ "database is consistent with Neutron")),
cfg.IntOpt('server_timeout', default=10,
help=_("Maximum number of seconds to wait for proxy request "
"to connect and complete.")),
--- /dev/null
+# Copyright 2014, Big Switch Networks
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import sqlalchemy as sa
+
+from neutron.db import api as db
+from neutron.db import model_base
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+'''
+A simple table to store the latest consistency hash
+received from a server in case neutron gets restarted.
+'''
+
+
+class ConsistencyHash(model_base.BASEV2):
+ '''
+ For now we only support one global state so the
+ hash_id will always be '1'
+ '''
+ __tablename__ = 'consistencyhashes'
+ hash_id = sa.Column(sa.String(255),
+ primary_key=True)
+ hash = sa.Column(sa.String(255), nullable=False)
+
+
+def get_consistency_hash(hash_id='1'):
+ session = db.get_session()
+ with session.begin(subtransactions=True):
+ query = session.query(ConsistencyHash)
+ res = query.filter_by(hash_id=hash_id).first()
+ if not res:
+ return False
+ return res.hash
+
+
+def put_consistency_hash(hash, hash_id='1'):
+ session = db.get_session()
+ with session.begin(subtransactions=True):
+ conhash = ConsistencyHash(hash_id=hash_id, hash=hash)
+ session.merge(conhash)
+ LOG.debug(_("Consistency hash for group %(hash_id)s updated "
+ "to %(hash)s"), {'hash_id': hash_id, 'hash': hash})
"""
import copy
+import httplib
import re
import eventlet
if not self.servers:
LOG.warning(_("ServerPool not set!"))
- def _send_all_data(self, send_ports=True, send_floating_ips=True,
- send_routers=True):
- """Pushes all data to network ctrl (networks/ports, ports/attachments).
-
- This gives the controller an option to re-sync it's persistent store
- with neutron's current view of that data.
- """
+ def _get_all_data(self, get_ports=True, get_floating_ips=True,
+ get_routers=True):
admin_context = qcontext.get_admin_context()
networks = []
for net in all_networks:
mapped_network = self._get_mapped_network_with_subnets(net)
flips_n_ports = {}
- if send_floating_ips:
+ if get_floating_ips:
flips_n_ports = self._get_network_with_floatingips(
mapped_network)
- if send_ports:
+ if get_ports:
ports = []
net_filter = {'network_id': [net.get('id')]}
net_ports = self.get_ports(admin_context,
if flips_n_ports:
networks.append(flips_n_ports)
- resource = '/topology'
- data = {
- 'networks': networks,
- }
+ data = {'networks': networks}
- if send_routers:
+ if get_routers:
routers = []
all_routers = self.get_routers(admin_context) or []
for router in all_routers:
routers.append(mapped_router)
data.update({'routers': routers})
+ return data
+ def _send_all_data(self, send_ports=True, send_floating_ips=True,
+ send_routers=True, timeout=None,
+ triggered_by_tenant=None):
+ """Pushes all data to network ctrl (networks/ports, ports/attachments).
+
+ This gives the controller an option to re-sync it's persistent store
+ with neutron's current view of that data.
+ """
+ data = self._get_all_data(send_ports, send_floating_ips, send_routers)
+ data['triggered_by_tenant'] = triggered_by_tenant
errstr = _("Unable to update remote topology: %s")
- return self.servers.rest_action('PUT', resource, data, errstr)
+ return self.servers.rest_action('PUT', servermanager.TOPOLOGY_PATH,
+ data, errstr, timeout=timeout)
def _get_network_with_floatingips(self, network, context=None):
if context is None:
try:
self.servers.rest_create_port(tenant_id, net_id, port)
except servermanager.RemoteRestError as e:
- LOG.error(
- _("NeutronRestProxyV2: Unable to create port: %s"), e)
- try:
- self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
- except exceptions.PortNotFound:
- # If port is already gone from DB and there was an error
- # creating on the backend, everything is already consistent
- pass
- return
+ # 404 should never be received on a port create unless
+ # there are inconsistencies between the data in neutron
+ # and the data in the backend.
+ # Run a sync to get it consistent.
+ if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
+ e.status == httplib.NOT_FOUND and
+ servermanager.NXNETWORK in e.reason):
+ LOG.error(_("Iconsistency with backend controller "
+ "triggering full synchronization."))
+ # args depend on if we are operating in ML2 driver
+ # or as the full plugin
+ topoargs = self.servers.get_topo_function_args
+ self._send_all_data(
+ send_ports=topoargs['get_ports'],
+ send_floating_ips=topoargs['get_floating_ips'],
+ send_routers=topoargs['get_routers'],
+ triggered_by_tenant=tenant_id
+ )
+ # If the full sync worked, the port will be created
+ # on the controller so it can be safely marked as active
+ else:
+ # Any errors that don't result in a successful auto-sync
+ # require that the port be placed into the error state.
+ LOG.error(
+ _("NeutronRestProxyV2: Unable to create port: %s"), e)
+ try:
+ self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
+ except exceptions.PortNotFound:
+ # If port is already gone from DB and there was an error
+ # creating on the backend, everything is already consistent
+ pass
+ return
new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
else const.PORT_STATUS_DOWN)
try:
# init network ctrl connections
self.servers = servermanager.ServerPool(server_timeout)
+ self.servers.get_topo_function = self._get_all_data
+ self.servers.get_topo_function_args = {'get_ports': True,
+ 'get_floating_ips': True,
+ 'get_routers': True}
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
import httplib
import json
import socket
+import time
+import eventlet
from oslo.config import cfg
from neutron.common import exceptions
from neutron.common import utils
from neutron.openstack.common import log as logging
-
+from neutron.plugins.bigswitch.db import consistency_db as cdb
LOG = logging.getLogger(__name__)
ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
ROUTERS_PATH = "/tenants/%s/routers/%s"
ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
+TOPOLOGY_PATH = "/topology"
+HEALTH_PATH = "/health"
SUCCESS_CODES = range(200, 207)
FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
504, 505]
BASE_URI = '/networkService/v1.1'
ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
+HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
+# error messages
+NXNETWORK = 'NXVNS'
class RemoteRestError(exceptions.NeutronException):
message = _("Error in REST call to remote network "
"controller: %(reason)s")
+ status = None
+
+ def __init__(self, **kwargs):
+ self.status = kwargs.pop('status', None)
+ self.reason = kwargs.get('reason')
+ super(RemoteRestError, self).__init__(**kwargs)
class ServerProxy(object):
"""REST server proxy to a network controller."""
def __init__(self, server, port, ssl, auth, neutron_id, timeout,
- base_uri, name):
+ base_uri, name, mypool):
self.server = server
self.port = port
self.ssl = ssl
self.neutron_id = neutron_id
self.failed = False
self.capabilities = []
+ # enable server to reference parent pool
+ self.mypool = mypool
if auth:
self.auth = 'Basic ' + base64.encodestring(auth).strip()
'cap': self.capabilities})
return self.capabilities
- def rest_call(self, action, resource, data='', headers=None):
+ def rest_call(self, action, resource, data='', headers={}, timeout=None):
uri = self.base_uri + resource
body = json.dumps(data)
if not headers:
headers['NeutronProxy-Agent'] = self.name
headers['Instance-ID'] = self.neutron_id
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
+ headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash
if self.auth:
headers['Authorization'] = self.auth
'action': action})
conn = None
+ timeout = timeout or self.timeout
if self.ssl:
conn = httplib.HTTPSConnection(
- self.server, self.port, timeout=self.timeout)
+ self.server, self.port, timeout=timeout)
if conn is None:
LOG.error(_('ServerProxy: Could not establish HTTPS '
'connection'))
return 0, None, None, None
else:
conn = httplib.HTTPConnection(
- self.server, self.port, timeout=self.timeout)
+ self.server, self.port, timeout=timeout)
if conn is None:
LOG.error(_('ServerProxy: Could not establish HTTP '
'connection'))
try:
conn.request(action, uri, body, headers)
response = conn.getresponse()
+ newhash = response.getheader(HASH_MATCH_HEADER)
+ if newhash:
+ self._put_consistency_hash(newhash)
respstr = response.read()
respdata = respstr
if response.status in self.success_codes:
'data': ret[3]})
return ret
+ def _put_consistency_hash(self, newhash):
+ self.mypool.consistency_hash = newhash
+ cdb.put_consistency_hash(newhash)
+
class ServerPool(object):
if timeout is not None:
self.timeout = timeout
+ # Function to use to retrieve topology for consistency syncs.
+ # Needs to be set by module that uses the servermanager.
+ self.get_topo_function = None
+ self.get_topo_function_args = {}
+
+ # Hash to send to backend with request as expected previous
+ # state to verify consistency.
+ self.consistency_hash = cdb.get_consistency_hash()
+ eventlet.spawn(self._consistency_watchdog,
+ cfg.CONF.RESTPROXY.consistency_interval)
+
if not servers:
raise cfg.Error(_('Servers not defined. Aborting server manager.'))
servers = [s if len(s.rsplit(':', 1)) == 2
def server_proxy_for(self, server, port):
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
- self.timeout, self.base_uri, self.name)
+ self.timeout, self.base_uri, self.name, mypool=self)
def server_failure(self, resp, ignore_codes=[]):
"""Define failure codes as required.
return resp[0] in SUCCESS_CODES
@utils.synchronized('bsn-rest-call')
- def rest_call(self, action, resource, data, headers, ignore_codes):
+ def rest_call(self, action, resource, data, headers, ignore_codes,
+ timeout=None):
good_first = sorted(self.servers, key=lambda x: x.failed)
+ first_response = None
for active_server in good_first:
- ret = active_server.rest_call(action, resource, data, headers)
+ ret = active_server.rest_call(action, resource, data, headers,
+ timeout)
+ # If inconsistent, do a full synchronization
+ if ret[0] == httplib.CONFLICT:
+ if not self.get_topo_function:
+ raise cfg.Error(_('Server requires synchronization, '
+ 'but no topology function was defined.'))
+ data = self.get_topo_function(**self.get_topo_function_args)
+ active_server.rest_call('PUT', TOPOLOGY_PATH, data,
+ timeout=None)
+ # Store the first response as the error to be bubbled up to the
+ # user since it was a good server. Subsequent servers will most
+ # likely be cluster slaves and won't have a useful error for the
+ # user (e.g. 302 redirect to master)
+ if not first_response:
+ first_response = ret
if not self.server_failure(ret, ignore_codes):
active_server.failed = False
return ret
{'action': action,
'server': tuple((s.server,
s.port) for s in self.servers)})
- return (0, None, None, None)
+ return first_response
def rest_action(self, action, resource, data='', errstr='%s',
- ignore_codes=[], headers=None):
+ ignore_codes=[], headers={}, timeout=None):
"""
Wrapper for rest_call that verifies success and raises a
RemoteRestError on failure with a provided error string
"""
if not ignore_codes and action == 'DELETE':
ignore_codes = [404]
- resp = self.rest_call(action, resource, data, headers, ignore_codes)
+ resp = self.rest_call(action, resource, data, headers, ignore_codes,
+ timeout)
if self.server_failure(resp, ignore_codes):
LOG.error(errstr, resp[2])
- raise RemoteRestError(reason=resp[2])
+ raise RemoteRestError(reason=resp[2], status=resp[0])
if resp[0] in ignore_codes:
LOG.warning(_("NeutronRestProxyV2: Received and ignored error "
"code %(code)s on %(action)s action to resource "
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
errstr = _("Unable to delete floating IP: %s")
self.rest_action('DELETE', resource, errstr=errstr)
+
+ def _consistency_watchdog(self, polling_interval=60):
+ if 'consistency' not in self.get_capabilities():
+ LOG.warning(_("Backend server(s) do not support automated "
+ "consitency checks."))
+ return
+ while True:
+ # If consistency is supported, all we have to do is make any
+ # rest call and the consistency header will be added. If it
+ # doesn't match, the backend will return a synchronization error
+ # that will be handled by the rest_call.
+ time.sleep(polling_interval)
+ self.servers.rest_call('GET', HEALTH_PATH)
# under the License.
#
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
+# @author: Kevin Benton, Big Switch Networks, Inc.
+
import eventlet
from oslo.config import cfg
from neutron.plugins.bigswitch import config as pl_config
from neutron.plugins.bigswitch.db import porttracker_db
from neutron.plugins.bigswitch.plugin import NeutronRestProxyV2Base
-from neutron.plugins.bigswitch.servermanager import ServerPool
+from neutron.plugins.bigswitch import servermanager
from neutron.plugins.ml2 import driver_api as api
self.native_bulk_support = False
# init network ctrl connections
- self.servers = ServerPool(server_timeout)
+ self.servers = servermanager.ServerPool(server_timeout)
+ self.servers.get_topo_function = self._get_all_data
+ self.servers.get_topo_function_args = {'get_ports': True,
+ 'get_floating_ips': False,
+ 'get_routers': False}
self.segmentation_types = ', '.join(cfg.CONF.ml2.type_drivers)
LOG.debug(_("Initialization done"))
prepped_port = self._map_state_and_status(prepped_port)
if (portbindings.HOST_ID not in prepped_port or
prepped_port[portbindings.HOST_ID] == ''):
+ LOG.warning(_("Ignoring port notification to controller because "
+ "of missing host ID."))
# in ML2, controller doesn't care about ports without
# the host_id set
return False
import json
from neutron.openstack.common import log as logging
+from neutron.plugins.bigswitch import servermanager
LOG = logging.getLogger(__name__)
def read(self):
return "{'status': '200 OK'}"
+ def getheader(self, header):
+ return None
+
class HTTPResponseMock404(HTTPResponseMock):
status = 404
reason = 'Not Found'
def read(self):
- return "{'status': '404 Not Found'}"
+ return "{'status': '%s 404 Not Found'}" % servermanager.NXNETWORK
class HTTPResponseMock500(HTTPResponseMock):
pass
+class HTTPConnectionMock404(HTTPConnectionMock):
+
+ def __init__(self, server, port, timeout):
+ self.response = HTTPResponseMock404(None)
+ self.broken = True
+
+
class HTTPConnectionMock500(HTTPConnectionMock):
def __init__(self, server, port, timeout):
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
+SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
HTTPCON = 'httplib.HTTPConnection'
SPAWN = 'eventlet.GreenPool.spawn_n'
+CWATCH = SERVER_MANAGER + '.ServerPool._consistency_watchdog'
class BigSwitchTestBase(object):
self.plugin_notifier_p = mock.patch(NOTIFIER)
self.callbacks_p = mock.patch(CALLBACKS)
self.spawn_p = mock.patch(SPAWN)
+ self.watch_p = mock.patch(CWATCH)
self.addCleanup(mock.patch.stopall)
self.addCleanup(db.clear_db)
self.callbacks_p.start()
self.plugin_notifier_p.start()
self.httpPatch.start()
self.spawn_p.start()
+ self.watch_p.start()
# See the License for the specific language governing permissions and
# limitations under the License.
-from mock import patch
+from contextlib import nested
+import mock
from oslo.config import cfg
import webob.exc
import neutron.tests.unit.test_db_plugin as test_plugin
import neutron.tests.unit.test_extension_allowedaddresspairs as test_addr_pair
+patch = mock.patch
+
class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
test_plugin.NeutronDbPluginV2TestCase):
res = req.get_response(self.api)
self.assertEqual(res.status_int, 200)
+ def test_create404_triggers_sync(self):
+ # allow async port thread for this patch
+ self.spawn_p.stop()
+ with nested(
+ self.subnet(),
+ patch('httplib.HTTPConnection', create=True,
+ new=fake_server.HTTPConnectionMock404),
+ patch(test_base.RESTPROXY_PKG_PATH
+ + '.NeutronRestProxyV2._send_all_data')
+ ) as (s, mock_http, mock_send_all):
+ with self.port(subnet=s, device_id='somedevid') as p:
+ # wait for the async port thread to finish
+ plugin = NeutronManager.get_plugin()
+ plugin.evpool.waitall()
+ call = mock.call(
+ send_routers=True, send_ports=True, send_floating_ips=True,
+ triggered_by_tenant=p['port']['tenant_id']
+ )
+ mock_send_all.assert_has_calls([call])
+ self.spawn_p.start()
+
class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2,
BigSwitchProxyPluginV2TestCase,
# See the License for the specific language governing permissions and
# limitations under the License.
+from contextlib import nested
+import mock
import webob.exc
from neutron.extensions import portbindings
+from neutron.manager import NeutronManager
+from neutron.plugins.bigswitch import servermanager
from neutron.plugins.ml2 import config as ml2_config
from neutron.plugins.ml2.drivers import type_vlan as vlan_config
import neutron.tests.unit.bigswitch.test_restproxy_plugin as trp
PHYS_NET = 'physnet1'
VLAN_START = 1000
VLAN_END = 1100
+SERVER_POOL = 'neutron.plugins.bigswitch.servermanager.ServerPool'
+DRIVER_MOD = 'neutron.plugins.ml2.drivers.mech_bigswitch.driver'
+DRIVER = DRIVER_MOD + '.BigSwitchMechanismDriver'
class TestBigSwitchMechDriverBase(trp.BigSwitchProxyPluginV2TestCase):
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
+
+ def test_create404_triggers_background_sync(self):
+ # allow the async background thread to run for this test
+ self.spawn_p.stop()
+ with nested(
+ mock.patch(SERVER_POOL + '.rest_create_port',
+ side_effect=servermanager.RemoteRestError(
+ reason=servermanager.NXNETWORK, status=404)),
+ mock.patch(DRIVER + '._send_all_data'),
+ self.port(**{'device_id': 'devid', 'binding:host_id': 'host'})
+ ) as (mock_http, mock_send_all, p):
+ # wait for thread to finish
+ mm = NeutronManager.get_plugin().mechanism_manager
+ bigdriver = mm.mech_drivers['bigswitch'].obj
+ bigdriver.evpool.waitall()
+ mock_send_all.assert_has_calls([
+ mock.call(
+ send_routers=False, send_ports=True,
+ send_floating_ips=False,
+ triggered_by_tenant=p['port']['tenant_id']
+ )
+ ])
+ self.spawn_p.start()