]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
BigSwitch: Auto re-sync on backend inconsistencies
authorKevin Benton <blak111@gmail.com>
Fri, 14 Feb 2014 10:07:29 +0000 (10:07 +0000)
committerMark McClain <mmcclain@yahoo-inc.com>
Wed, 5 Mar 2014 20:08:39 +0000 (15:08 -0500)
If the controller supports it, pass a hash to the
controller indicating the expected state that a
REST transaction is updating. If the state is
inconsistent, the controller will return an error
indicating a conflict and the plugin/driver will
trigger a full synchronization.

For controllers that don't support the consistency
hash, trigger a full background synchronization
if the plugin tries to create a port and receives
a 404 error due to the parent network not existing.

Implements: blueprint bsn-auto-resync
Change-Id: I07c92b011453f6bf81b8ee12661170817287cdd7

etc/neutron/plugins/bigswitch/restproxy.ini
neutron/db/migration/alembic_migrations/versions/81c553f3776c_bsn_consistencyhashes.py [new file with mode: 0644]
neutron/plugins/bigswitch/config.py
neutron/plugins/bigswitch/db/consistency_db.py [new file with mode: 0644]
neutron/plugins/bigswitch/plugin.py
neutron/plugins/bigswitch/servermanager.py
neutron/plugins/ml2/drivers/mech_bigswitch/driver.py
neutron/tests/unit/bigswitch/fake_server.py
neutron/tests/unit/bigswitch/test_base.py
neutron/tests/unit/bigswitch/test_restproxy_plugin.py
neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py

index 39065741e3da950085fda7bda8e38265c6a636d3..69c21c0c8e6ff2d7449e53509280160ce46f47a5 100644 (file)
@@ -8,7 +8,8 @@
 #   server_auth           :  <username:password>          (default: no auth)
 #   server_ssl            :  True | False                 (default: False)
 #   sync_data             :  True | False                 (default: False)
-#   server_timeout        :  <int>                        (default: 10 seconds)
+#   auto_sync_on_failure  :  True | False                 (default: True)
+#   server_timeout        :  <integer>                    (default: 10 seconds)
 #   neutron_id            :  <string>                     (default: neutron-<hostname>)
 #   add_meta_server_route :  True | False                 (default: True)
 #   thread_pool_size      :  <int>                        (default: 4)
@@ -25,6 +26,11 @@ servers=localhost:8080
 # Sync data on connect
 # sync_data=False
 
+# If neutron fails to create a resource because the backend controller
+# doesn't know of a dependency, automatically trigger a full data
+# synchronization to the controller.
+# auto_sync_on_failure=True
+
 # Maximum number of seconds to wait for proxy request to connect and complete.
 # server_timeout=10
 
diff --git a/neutron/db/migration/alembic_migrations/versions/81c553f3776c_bsn_consistencyhashes.py b/neutron/db/migration/alembic_migrations/versions/81c553f3776c_bsn_consistencyhashes.py
new file mode 100644 (file)
index 0000000..4b14bb1
--- /dev/null
@@ -0,0 +1,55 @@
+# Copyright 2014 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""bsn_consistencyhashes
+
+Revision ID: 81c553f3776c
+Revises: 24c7ea5160d7
+Create Date: 2014-02-26 18:56:00.402855
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '81c553f3776c'
+down_revision = '24c7ea5160d7'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+    'neutron.plugins.bigswitch.plugin.NeutronRestProxyV2'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from neutron.db import migration
+
+
+def upgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.create_table(
+        'consistencyhashes',
+        sa.Column('hash_id', sa.String(255), primary_key=True),
+        sa.Column('hash', sa.String(255), nullable=False)
+    )
+
+
+def downgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.drop_table('consistencyhashes')
index 2a50c9a7a7eb0db49b73dd91d441ea7a59ddec1f..ae7e0526e255f681202bf8db88414520de6be230 100644 (file)
@@ -44,6 +44,14 @@ restproxy_opts = [
                        "Floodlight controller.")),
     cfg.BoolOpt('sync_data', default=False,
                 help=_("Sync data on connect")),
+    cfg.BoolOpt('auto_sync_on_failure', default=True,
+                help=_("If neutron fails to create a resource because "
+                       "the backend controller doesn't know of a dependency, "
+                       "automatically trigger a full data synchronization "
+                       "to the controller.")),
+    cfg.IntOpt('consistency_interval', default=60,
+               help=_("Time between verifications that the backend controller "
+                      "database is consistent with Neutron")),
     cfg.IntOpt('server_timeout', default=10,
                help=_("Maximum number of seconds to wait for proxy request "
                       "to connect and complete.")),
diff --git a/neutron/plugins/bigswitch/db/consistency_db.py b/neutron/plugins/bigswitch/db/consistency_db.py
new file mode 100644 (file)
index 0000000..cd89a26
--- /dev/null
@@ -0,0 +1,56 @@
+# Copyright 2014, Big Switch Networks
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import sqlalchemy as sa
+
+from neutron.db import api as db
+from neutron.db import model_base
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+'''
+A simple table to store the latest consistency hash
+received from a server in case neutron gets restarted.
+'''
+
+
+class ConsistencyHash(model_base.BASEV2):
+    '''
+    For now we only support one global state so the
+    hash_id will always be '1'
+    '''
+    __tablename__ = 'consistencyhashes'
+    hash_id = sa.Column(sa.String(255),
+                        primary_key=True)
+    hash = sa.Column(sa.String(255), nullable=False)
+
+
+def get_consistency_hash(hash_id='1'):
+    session = db.get_session()
+    with session.begin(subtransactions=True):
+        query = session.query(ConsistencyHash)
+        res = query.filter_by(hash_id=hash_id).first()
+    if not res:
+        return False
+    return res.hash
+
+
+def put_consistency_hash(hash, hash_id='1'):
+    session = db.get_session()
+    with session.begin(subtransactions=True):
+        conhash = ConsistencyHash(hash_id=hash_id, hash=hash)
+        session.merge(conhash)
+        LOG.debug(_("Consistency hash for group %(hash_id)s updated "
+                    "to %(hash)s"), {'hash_id': hash_id, 'hash': hash})
index 32b273d7989f60fe84939d9ce46e8ce6f7340aff..785e3b0b351451b8784a1ca0063cd71af7c953f8 100644 (file)
@@ -45,6 +45,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
 """
 
 import copy
+import httplib
 import re
 
 import eventlet
@@ -172,13 +173,8 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
         if not self.servers:
             LOG.warning(_("ServerPool not set!"))
 
-    def _send_all_data(self, send_ports=True, send_floating_ips=True,
-                       send_routers=True):
-        """Pushes all data to network ctrl (networks/ports, ports/attachments).
-
-        This gives the controller an option to re-sync it's persistent store
-        with neutron's current view of that data.
-        """
+    def _get_all_data(self, get_ports=True, get_floating_ips=True,
+                      get_routers=True):
         admin_context = qcontext.get_admin_context()
         networks = []
 
@@ -186,11 +182,11 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
         for net in all_networks:
             mapped_network = self._get_mapped_network_with_subnets(net)
             flips_n_ports = {}
-            if send_floating_ips:
+            if get_floating_ips:
                 flips_n_ports = self._get_network_with_floatingips(
                     mapped_network)
 
-            if send_ports:
+            if get_ports:
                 ports = []
                 net_filter = {'network_id': [net.get('id')]}
                 net_ports = self.get_ports(admin_context,
@@ -209,12 +205,9 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
             if flips_n_ports:
                 networks.append(flips_n_ports)
 
-        resource = '/topology'
-        data = {
-            'networks': networks,
-        }
+        data = {'networks': networks}
 
-        if send_routers:
+        if get_routers:
             routers = []
             all_routers = self.get_routers(admin_context) or []
             for router in all_routers:
@@ -238,9 +231,21 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
                 routers.append(mapped_router)
 
             data.update({'routers': routers})
+        return data
 
+    def _send_all_data(self, send_ports=True, send_floating_ips=True,
+                       send_routers=True, timeout=None,
+                       triggered_by_tenant=None):
+        """Pushes all data to network ctrl (networks/ports, ports/attachments).
+
+        This gives the controller an option to re-sync it's persistent store
+        with neutron's current view of that data.
+        """
+        data = self._get_all_data(send_ports, send_floating_ips, send_routers)
+        data['triggered_by_tenant'] = triggered_by_tenant
         errstr = _("Unable to update remote topology: %s")
-        return self.servers.rest_action('PUT', resource, data, errstr)
+        return self.servers.rest_action('PUT', servermanager.TOPOLOGY_PATH,
+                                        data, errstr, timeout=timeout)
 
     def _get_network_with_floatingips(self, network, context=None):
         if context is None:
@@ -386,15 +391,38 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
         try:
             self.servers.rest_create_port(tenant_id, net_id, port)
         except servermanager.RemoteRestError as e:
-            LOG.error(
-                _("NeutronRestProxyV2: Unable to create port: %s"), e)
-            try:
-                self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
-            except exceptions.PortNotFound:
-                # If port is already gone from DB and there was an error
-                # creating on the backend, everything is already consistent
-                pass
-            return
+            # 404 should never be received on a port create unless
+            # there are inconsistencies between the data in neutron
+            # and the data in the backend.
+            # Run a sync to get it consistent.
+            if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
+                e.status == httplib.NOT_FOUND and
+                servermanager.NXNETWORK in e.reason):
+                LOG.error(_("Iconsistency with backend controller "
+                            "triggering full synchronization."))
+                # args depend on if we are operating in ML2 driver
+                # or as the full plugin
+                topoargs = self.servers.get_topo_function_args
+                self._send_all_data(
+                    send_ports=topoargs['get_ports'],
+                    send_floating_ips=topoargs['get_floating_ips'],
+                    send_routers=topoargs['get_routers'],
+                    triggered_by_tenant=tenant_id
+                )
+                # If the full sync worked, the port will be created
+                # on the controller so it can be safely marked as active
+            else:
+                # Any errors that don't result in a successful auto-sync
+                # require that the port be placed into the error state.
+                LOG.error(
+                    _("NeutronRestProxyV2: Unable to create port: %s"), e)
+                try:
+                    self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
+                except exceptions.PortNotFound:
+                    # If port is already gone from DB and there was an error
+                    # creating on the backend, everything is already consistent
+                    pass
+                return
         new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
                       else const.PORT_STATUS_DOWN)
         try:
@@ -448,6 +476,10 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
 
         # init network ctrl connections
         self.servers = servermanager.ServerPool(server_timeout)
+        self.servers.get_topo_function = self._get_all_data
+        self.servers.get_topo_function_args = {'get_ports': True,
+                                               'get_floating_ips': True,
+                                               'get_routers': True}
 
         self.network_scheduler = importutils.import_object(
             cfg.CONF.network_scheduler_driver
index 7534be17ffa2c2e4e99072c53948374478d4db3f..dd182d71a613e35a0abbf7521be2a1b5ed6957da 100644 (file)
@@ -34,13 +34,15 @@ import base64
 import httplib
 import json
 import socket
+import time
 
+import eventlet
 from oslo.config import cfg
 
 from neutron.common import exceptions
 from neutron.common import utils
 from neutron.openstack.common import log as logging
-
+from neutron.plugins.bigswitch.db import consistency_db as cdb
 
 LOG = logging.getLogger(__name__)
 
@@ -56,23 +58,34 @@ PORTS_PATH = "/tenants/%s/networks/%s/ports/%s"
 ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
 ROUTERS_PATH = "/tenants/%s/routers/%s"
 ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
+TOPOLOGY_PATH = "/topology"
+HEALTH_PATH = "/health"
 SUCCESS_CODES = range(200, 207)
 FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
                  504, 505]
 BASE_URI = '/networkService/v1.1'
 ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
+HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
+# error messages
+NXNETWORK = 'NXVNS'
 
 
 class RemoteRestError(exceptions.NeutronException):
     message = _("Error in REST call to remote network "
                 "controller: %(reason)s")
+    status = None
+
+    def __init__(self, **kwargs):
+        self.status = kwargs.pop('status', None)
+        self.reason = kwargs.get('reason')
+        super(RemoteRestError, self).__init__(**kwargs)
 
 
 class ServerProxy(object):
     """REST server proxy to a network controller."""
 
     def __init__(self, server, port, ssl, auth, neutron_id, timeout,
-                 base_uri, name):
+                 base_uri, name, mypool):
         self.server = server
         self.port = port
         self.ssl = ssl
@@ -84,6 +97,8 @@ class ServerProxy(object):
         self.neutron_id = neutron_id
         self.failed = False
         self.capabilities = []
+        # enable server to reference parent pool
+        self.mypool = mypool
         if auth:
             self.auth = 'Basic ' + base64.encodestring(auth).strip()
 
@@ -99,7 +114,7 @@ class ServerProxy(object):
                                                 'cap': self.capabilities})
         return self.capabilities
 
-    def rest_call(self, action, resource, data='', headers=None):
+    def rest_call(self, action, resource, data='', headers={}, timeout=None):
         uri = self.base_uri + resource
         body = json.dumps(data)
         if not headers:
@@ -109,6 +124,7 @@ class ServerProxy(object):
         headers['NeutronProxy-Agent'] = self.name
         headers['Instance-ID'] = self.neutron_id
         headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
+        headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash
         if self.auth:
             headers['Authorization'] = self.auth
 
@@ -121,16 +137,17 @@ class ServerProxy(object):
                    'action': action})
 
         conn = None
+        timeout = timeout or self.timeout
         if self.ssl:
             conn = httplib.HTTPSConnection(
-                self.server, self.port, timeout=self.timeout)
+                self.server, self.port, timeout=timeout)
             if conn is None:
                 LOG.error(_('ServerProxy: Could not establish HTTPS '
                             'connection'))
                 return 0, None, None, None
         else:
             conn = httplib.HTTPConnection(
-                self.server, self.port, timeout=self.timeout)
+                self.server, self.port, timeout=timeout)
             if conn is None:
                 LOG.error(_('ServerProxy: Could not establish HTTP '
                             'connection'))
@@ -139,6 +156,9 @@ class ServerProxy(object):
         try:
             conn.request(action, uri, body, headers)
             response = conn.getresponse()
+            newhash = response.getheader(HASH_MATCH_HEADER)
+            if newhash:
+                self._put_consistency_hash(newhash)
             respstr = response.read()
             respdata = respstr
             if response.status in self.success_codes:
@@ -160,6 +180,10 @@ class ServerProxy(object):
                                                     'data': ret[3]})
         return ret
 
+    def _put_consistency_hash(self, newhash):
+        self.mypool.consistency_hash = newhash
+        cdb.put_consistency_hash(newhash)
+
 
 class ServerPool(object):
 
@@ -180,6 +204,17 @@ class ServerPool(object):
         if timeout is not None:
             self.timeout = timeout
 
+        # Function to use to retrieve topology for consistency syncs.
+        # Needs to be set by module that uses the servermanager.
+        self.get_topo_function = None
+        self.get_topo_function_args = {}
+
+        # Hash to send to backend with request as expected previous
+        # state to verify consistency.
+        self.consistency_hash = cdb.get_consistency_hash()
+        eventlet.spawn(self._consistency_watchdog,
+                       cfg.CONF.RESTPROXY.consistency_interval)
+
         if not servers:
             raise cfg.Error(_('Servers not defined. Aborting server manager.'))
         servers = [s if len(s.rsplit(':', 1)) == 2
@@ -210,7 +245,7 @@ class ServerPool(object):
 
     def server_proxy_for(self, server, port):
         return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
-                           self.timeout, self.base_uri, self.name)
+                           self.timeout, self.base_uri, self.name, mypool=self)
 
     def server_failure(self, resp, ignore_codes=[]):
         """Define failure codes as required.
@@ -228,10 +263,27 @@ class ServerPool(object):
         return resp[0] in SUCCESS_CODES
 
     @utils.synchronized('bsn-rest-call')
-    def rest_call(self, action, resource, data, headers, ignore_codes):
+    def rest_call(self, action, resource, data, headers, ignore_codes,
+                  timeout=None):
         good_first = sorted(self.servers, key=lambda x: x.failed)
+        first_response = None
         for active_server in good_first:
-            ret = active_server.rest_call(action, resource, data, headers)
+            ret = active_server.rest_call(action, resource, data, headers,
+                                          timeout)
+            # If inconsistent, do a full synchronization
+            if ret[0] == httplib.CONFLICT:
+                if not self.get_topo_function:
+                    raise cfg.Error(_('Server requires synchronization, '
+                                      'but no topology function was defined.'))
+                data = self.get_topo_function(**self.get_topo_function_args)
+                active_server.rest_call('PUT', TOPOLOGY_PATH, data,
+                                        timeout=None)
+            # Store the first response as the error to be bubbled up to the
+            # user since it was a good server. Subsequent servers will most
+            # likely be cluster slaves and won't have a useful error for the
+            # user (e.g. 302 redirect to master)
+            if not first_response:
+                first_response = ret
             if not self.server_failure(ret, ignore_codes):
                 active_server.failed = False
                 return ret
@@ -254,10 +306,10 @@ class ServerPool(object):
                   {'action': action,
                    'server': tuple((s.server,
                                     s.port) for s in self.servers)})
-        return (0, None, None, None)
+        return first_response
 
     def rest_action(self, action, resource, data='', errstr='%s',
-                    ignore_codes=[], headers=None):
+                    ignore_codes=[], headers={}, timeout=None):
         """
         Wrapper for rest_call that verifies success and raises a
         RemoteRestError on failure with a provided error string
@@ -266,10 +318,11 @@ class ServerPool(object):
         """
         if not ignore_codes and action == 'DELETE':
             ignore_codes = [404]
-        resp = self.rest_call(action, resource, data, headers, ignore_codes)
+        resp = self.rest_call(action, resource, data, headers, ignore_codes,
+                              timeout)
         if self.server_failure(resp, ignore_codes):
             LOG.error(errstr, resp[2])
-            raise RemoteRestError(reason=resp[2])
+            raise RemoteRestError(reason=resp[2], status=resp[0])
         if resp[0] in ignore_codes:
             LOG.warning(_("NeutronRestProxyV2: Received and ignored error "
                           "code %(code)s on %(action)s action to resource "
@@ -361,3 +414,16 @@ class ServerPool(object):
         resource = FLOATINGIPS_PATH % (tenant_id, oldid)
         errstr = _("Unable to delete floating IP: %s")
         self.rest_action('DELETE', resource, errstr=errstr)
+
+    def _consistency_watchdog(self, polling_interval=60):
+        if 'consistency' not in self.get_capabilities():
+            LOG.warning(_("Backend server(s) do not support automated "
+                          "consitency checks."))
+            return
+        while True:
+            # If consistency is supported, all we have to do is make any
+            # rest call and the consistency header will be added. If it
+            # doesn't match, the backend will return a synchronization error
+            # that will be handled by the rest_call.
+            time.sleep(polling_interval)
+            self.servers.rest_call('GET', HEALTH_PATH)
index e13d475bd8cca431eafd3ca349fbd8e4764dc6af..ced7b37ff1aee4de3304ef4ece4d766e74d2a595 100644 (file)
@@ -16,6 +16,8 @@
 #    under the License.
 #
 # @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
+# @author: Kevin Benton, Big Switch Networks, Inc.
+
 import eventlet
 from oslo.config import cfg
 
@@ -25,7 +27,7 @@ from neutron.openstack.common import log
 from neutron.plugins.bigswitch import config as pl_config
 from neutron.plugins.bigswitch.db import porttracker_db
 from neutron.plugins.bigswitch.plugin import NeutronRestProxyV2Base
-from neutron.plugins.bigswitch.servermanager import ServerPool
+from neutron.plugins.bigswitch import servermanager
 from neutron.plugins.ml2 import driver_api as api
 
 
@@ -51,7 +53,11 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
         self.native_bulk_support = False
 
         # init network ctrl connections
-        self.servers = ServerPool(server_timeout)
+        self.servers = servermanager.ServerPool(server_timeout)
+        self.servers.get_topo_function = self._get_all_data
+        self.servers.get_topo_function_args = {'get_ports': True,
+                                               'get_floating_ips': False,
+                                               'get_routers': False}
         self.segmentation_types = ', '.join(cfg.CONF.ml2.type_drivers)
         LOG.debug(_("Initialization done"))
 
@@ -102,6 +108,8 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
         prepped_port = self._map_state_and_status(prepped_port)
         if (portbindings.HOST_ID not in prepped_port or
             prepped_port[portbindings.HOST_ID] == ''):
+            LOG.warning(_("Ignoring port notification to controller because "
+                          "of missing host ID."))
             # in ML2, controller doesn't care about ports without
             # the host_id set
             return False
index 6b21a1a7121a43973fcb420b0de010f3c7b34de7..ba4062c7f74a5e7590f3e03ac9236a571d3710cd 100644 (file)
@@ -20,6 +20,7 @@
 import json
 
 from neutron.openstack.common import log as logging
+from neutron.plugins.bigswitch import servermanager
 
 LOG = logging.getLogger(__name__)
 
@@ -35,13 +36,16 @@ class HTTPResponseMock():
     def read(self):
         return "{'status': '200 OK'}"
 
+    def getheader(self, header):
+        return None
+
 
 class HTTPResponseMock404(HTTPResponseMock):
     status = 404
     reason = 'Not Found'
 
     def read(self):
-        return "{'status': '404 Not Found'}"
+        return "{'status': '%s 404 Not Found'}" % servermanager.NXNETWORK
 
 
 class HTTPResponseMock500(HTTPResponseMock):
@@ -99,6 +103,13 @@ class HTTPConnectionMock(object):
         pass
 
 
+class HTTPConnectionMock404(HTTPConnectionMock):
+
+    def __init__(self, server, port, timeout):
+        self.response = HTTPResponseMock404(None)
+        self.broken = True
+
+
 class HTTPConnectionMock500(HTTPConnectionMock):
 
     def __init__(self, server, port, timeout):
index c6da25290ff572f4c18f2812289c201ac36731bb..8f25631133838c2bb218cd95003122ea4a445a94 100644 (file)
@@ -29,8 +29,10 @@ RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
 NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
 CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
 CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
+SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
 HTTPCON = 'httplib.HTTPConnection'
 SPAWN = 'eventlet.GreenPool.spawn_n'
+CWATCH = SERVER_MANAGER + '.ServerPool._consistency_watchdog'
 
 
 class BigSwitchTestBase(object):
@@ -50,9 +52,11 @@ class BigSwitchTestBase(object):
         self.plugin_notifier_p = mock.patch(NOTIFIER)
         self.callbacks_p = mock.patch(CALLBACKS)
         self.spawn_p = mock.patch(SPAWN)
+        self.watch_p = mock.patch(CWATCH)
         self.addCleanup(mock.patch.stopall)
         self.addCleanup(db.clear_db)
         self.callbacks_p.start()
         self.plugin_notifier_p.start()
         self.httpPatch.start()
         self.spawn_p.start()
+        self.watch_p.start()
index 72425f254605639db963948292325b3e66d22404..96141c5a36968862f76e8aa33be28c7a4415b1d8 100644 (file)
@@ -15,7 +15,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from mock import patch
+from contextlib import nested
+import mock
 from oslo.config import cfg
 import webob.exc
 
@@ -29,6 +30,8 @@ from neutron.tests.unit import test_api_v2
 import neutron.tests.unit.test_db_plugin as test_plugin
 import neutron.tests.unit.test_extension_allowedaddresspairs as test_addr_pair
 
+patch = mock.patch
+
 
 class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
                                      test_plugin.NeutronDbPluginV2TestCase):
@@ -150,6 +153,27 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
                     res = req.get_response(self.api)
                     self.assertEqual(res.status_int, 200)
 
+    def test_create404_triggers_sync(self):
+        # allow async port thread for this patch
+        self.spawn_p.stop()
+        with nested(
+            self.subnet(),
+            patch('httplib.HTTPConnection', create=True,
+                  new=fake_server.HTTPConnectionMock404),
+            patch(test_base.RESTPROXY_PKG_PATH
+                  + '.NeutronRestProxyV2._send_all_data')
+        ) as (s, mock_http, mock_send_all):
+            with self.port(subnet=s, device_id='somedevid') as p:
+                # wait for the async port thread to finish
+                plugin = NeutronManager.get_plugin()
+                plugin.evpool.waitall()
+        call = mock.call(
+            send_routers=True, send_ports=True, send_floating_ips=True,
+            triggered_by_tenant=p['port']['tenant_id']
+        )
+        mock_send_all.assert_has_calls([call])
+        self.spawn_p.start()
+
 
 class TestBigSwitchProxyPortsV2IVS(test_plugin.TestPortsV2,
                                    BigSwitchProxyPluginV2TestCase,
index 93e52f4d696d194ec35445c723415bf23ac93d71..c1d3c527797b2f00f1f56e974bd6e1fb9c7a9974 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from contextlib import nested
+import mock
 import webob.exc
 
 from neutron.extensions import portbindings
+from neutron.manager import NeutronManager
+from neutron.plugins.bigswitch import servermanager
 from neutron.plugins.ml2 import config as ml2_config
 from neutron.plugins.ml2.drivers import type_vlan as vlan_config
 import neutron.tests.unit.bigswitch.test_restproxy_plugin as trp
@@ -27,6 +31,9 @@ from neutron.tests.unit import test_db_plugin
 PHYS_NET = 'physnet1'
 VLAN_START = 1000
 VLAN_END = 1100
+SERVER_POOL = 'neutron.plugins.bigswitch.servermanager.ServerPool'
+DRIVER_MOD = 'neutron.plugins.ml2.drivers.mech_bigswitch.driver'
+DRIVER = DRIVER_MOD + '.BigSwitchMechanismDriver'
 
 
 class TestBigSwitchMechDriverBase(trp.BigSwitchProxyPluginV2TestCase):
@@ -93,3 +100,26 @@ class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2,
         if res.status_int >= 400:
             raise webob.exc.HTTPClientError(code=res.status_int)
         return self.deserialize(fmt, res)
+
+    def test_create404_triggers_background_sync(self):
+        # allow the async background thread to run for this test
+        self.spawn_p.stop()
+        with nested(
+            mock.patch(SERVER_POOL + '.rest_create_port',
+                       side_effect=servermanager.RemoteRestError(
+                           reason=servermanager.NXNETWORK, status=404)),
+            mock.patch(DRIVER + '._send_all_data'),
+            self.port(**{'device_id': 'devid', 'binding:host_id': 'host'})
+        ) as (mock_http, mock_send_all, p):
+            # wait for thread to finish
+            mm = NeutronManager.get_plugin().mechanism_manager
+            bigdriver = mm.mech_drivers['bigswitch'].obj
+            bigdriver.evpool.waitall()
+            mock_send_all.assert_has_calls([
+                mock.call(
+                    send_routers=False, send_ports=True,
+                    send_floating_ips=False,
+                    triggered_by_tenant=p['port']['tenant_id']
+                )
+            ])
+        self.spawn_p.start()