]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
BigSwitch: Asynchronous rest calls for port create
authorKevin Benton <blak111@gmail.com>
Tue, 18 Feb 2014 22:18:47 +0000 (22:18 +0000)
committerThomas Goirand <thomas@goirand.fr>
Thu, 13 Mar 2014 07:20:38 +0000 (15:20 +0800)
Makes rest calls for port creation an async
operation so create_port calls immediately
return in a BUILD state.

Implements: blueprint bsn-port-async
Change-Id: Ib512a846fa878ec33205df08a3b2464b7ea0941a

etc/neutron/plugins/bigswitch/restproxy.ini
neutron/plugins/bigswitch/config.py
neutron/plugins/bigswitch/plugin.py
neutron/plugins/ml2/drivers/mech_bigswitch/driver.py
neutron/tests/unit/bigswitch/test_base.py
neutron/tests/unit/bigswitch/test_restproxy_plugin.py

index a982010f637ff521c6548905f535c605f1e2786c..39065741e3da950085fda7bda8e38265c6a636d3 100644 (file)
@@ -8,10 +8,10 @@
 #   server_auth           :  <username:password>          (default: no auth)
 #   server_ssl            :  True | False                 (default: False)
 #   sync_data             :  True | False                 (default: False)
-#   server_timeout        :  10                           (default: 10 seconds)
+#   server_timeout        :  <int>                        (default: 10 seconds)
 #   neutron_id            :  <string>                     (default: neutron-<hostname>)
 #   add_meta_server_route :  True | False                 (default: True)
-#
+#   thread_pool_size      :  <int>                        (default: 4)
 
 # A comma separated list of BigSwitch or Floodlight servers and port numbers. The plugin proxies the requests to the BigSwitch/Floodlight server, which performs the networking configuration. Note that only one server is needed per deployment, but you may wish to deploy multiple servers to support failover.
 servers=localhost:8080
@@ -34,6 +34,9 @@ servers=localhost:8080
 # Flag to decide if a route to the metadata server should be injected into the VM
 # add_meta_server_route = True
 
+# Number of threads to use to handle large volumes of port creation requests
+# thread_pool_size = 4
+
 [nova]
 # Specify the VIF_TYPE that will be controlled on the Nova compute instances
 #    options: ivs or ovs
index 5094617b7914d82690459f85d74de3fa7d2f0a89..2a50c9a7a7eb0db49b73dd91d441ea7a59ddec1f 100644 (file)
@@ -47,6 +47,9 @@ restproxy_opts = [
     cfg.IntOpt('server_timeout', default=10,
                help=_("Maximum number of seconds to wait for proxy request "
                       "to connect and complete.")),
+    cfg.IntOpt('thread_pool_size', default=4,
+               help=_("Maximum number of threads to spawn to handle large "
+                      "volumes of port creations.")),
     cfg.StrOpt('neutron_id', default='neutron-' + utils.get_hostname(),
                deprecated_name='quantum_id',
                help=_("User defined identifier for this Neutron deployment")),
index 12a80955d4c2ace9a838f58837506d52c32e4394..267b896b2a3c9b068e16603509d0be11fd555633 100644 (file)
@@ -47,7 +47,9 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
 import copy
 import re
 
+import eventlet
 from oslo.config import cfg
+from sqlalchemy.orm import exc as sqlexc
 
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api import extensions as neutron_extensions
@@ -86,7 +88,6 @@ from neutron.plugins.bigswitch.version import version_string_with_vcs
 
 LOG = logging.getLogger(__name__)
 
-
 SYNTAX_ERROR_MESSAGE = _('Syntax error in server config file, aborting plugin')
 METADATA_SERVER_IP = '169.254.169.254'
 
@@ -379,6 +380,38 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
                     self).get_network(context, port["network_id"])
         return net['tenant_id']
 
+    def async_port_create(self, tenant_id, net_id, port):
+        try:
+            self.servers.rest_create_port(tenant_id, net_id, port)
+        except servermanager.RemoteRestError as e:
+            LOG.error(
+                _("NeutronRestProxyV2: Unable to create port: %s"), e)
+            try:
+                self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
+            except exceptions.PortNotFound:
+                # If port is already gone from DB and there was an error
+                # creating on the backend, everything is already consistent
+                pass
+            return
+        new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
+                      else const.PORT_STATUS_DOWN)
+        try:
+            self._set_port_status(port['id'], new_status)
+        except exceptions.PortNotFound:
+            # This port was deleted before the create made it to the controller
+            # so it now needs to be deleted since the normal delete request
+            # would have deleted an non-existent port.
+            self.servers.rest_delete_port(tenant_id, net_id, port['id'])
+
+    def _set_port_status(self, port_id, status):
+        session = db.get_session()
+        try:
+            port = session.query(models_v2.Port).filter_by(id=port_id).one()
+            port['status'] = status
+            session.flush()
+        except sqlexc.NoResultFound:
+            raise exceptions.PortNotFound(port_id=port_id)
+
 
 class NeutronRestProxyV2(NeutronRestProxyV2Base,
                          extradhcpopt_db.ExtraDhcpOptMixin,
@@ -403,6 +436,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
         LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'),
                  version_string_with_vcs())
         pl_config.register_config()
+        self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
 
         # Include the BigSwitch Extensions path in the api_extensions
         neutron_extensions.append_api_extensions_path(extensions.__path__)
@@ -584,29 +618,31 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
         with context.session.begin(subtransactions=True):
             self._ensure_default_security_group_on_port(context, port)
             sgids = self._get_security_groups_on_port(context, port)
+            # set port status to pending. updated after rest call completes
+            port['port']['status'] = const.PORT_STATUS_BUILD
             dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, [])
             new_port = super(NeutronRestProxyV2, self).create_port(context,
                                                                    port)
-            if (portbindings.HOST_ID in port['port']
-                and 'id' in new_port):
-                host_id = port['port'][portbindings.HOST_ID]
-                porttracker_db.put_port_hostid(context, new_port['id'],
-                                               host_id)
-            self._process_port_create_extra_dhcp_opts(context, new_port,
-                                                      dhcp_opts)
-            new_port = self._extend_port_dict_binding(context, new_port)
-            net_tenant_id = self._get_port_net_tenantid(context, new_port)
-            if self.add_meta_server_route:
-                if new_port['device_owner'] == 'network:dhcp':
-                    destination = METADATA_SERVER_IP + '/32'
-                    self._add_host_route(context, destination, new_port)
-
-            # create on network ctrl
-            mapped_port = self._map_state_and_status(new_port)
-            self.servers.rest_create_port(net_tenant_id,
-                                          new_port["network_id"],
-                                          mapped_port)
             self._process_port_create_security_group(context, new_port, sgids)
+        if (portbindings.HOST_ID in port['port']
+            and 'id' in new_port):
+            host_id = port['port'][portbindings.HOST_ID]
+            porttracker_db.put_port_hostid(context, new_port['id'],
+                                           host_id)
+        self._process_port_create_extra_dhcp_opts(context, new_port,
+                                                  dhcp_opts)
+        new_port = self._extend_port_dict_binding(context, new_port)
+        net = super(NeutronRestProxyV2,
+                    self).get_network(context, new_port["network_id"])
+        if self.add_meta_server_route:
+            if new_port['device_owner'] == 'network:dhcp':
+                destination = METADATA_SERVER_IP + '/32'
+                self._add_host_route(context, destination, new_port)
+
+        # create on network ctrl
+        mapped_port = self._map_state_and_status(new_port)
+        self.evpool.spawn_n(self.async_port_create, net["tenant_id"],
+                            new_port["network_id"], mapped_port)
         self.notify_security_groups_member_updated(context, new_port)
         return new_port
 
index 05143c307cf2e4f63e50ad7c5d14d4e52e4f0b36..e13d475bd8cca431eafd3ca349fbd8e4764dc6af 100644 (file)
@@ -16,7 +16,7 @@
 #    under the License.
 #
 # @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
-
+import eventlet
 from oslo.config import cfg
 
 from neutron import context as ctx
@@ -46,6 +46,7 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
 
         # register plugin config opts
         pl_config.register_config()
+        self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
         # backend doesn't support bulk operations yet
         self.native_bulk_support = False
 
@@ -70,8 +71,8 @@ class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
         # create port on the network controller
         port = self._prepare_port_for_controller(context)
         if port:
-            self.servers.rest_create_port(port["network"]["tenant_id"],
-                                          port["network"]["id"], port)
+            self.async_port_create(port["network"]["tenant_id"],
+                                   port["network"]["id"], port)
 
     def update_port_postcommit(self, context):
         # update port on the network controller
index 77cea72d038a56988b61ec7a2eec0b039eac472c..c6da25290ff572f4c18f2812289c201ac36731bb 100644 (file)
@@ -30,6 +30,7 @@ NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
 CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
 CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
 HTTPCON = 'httplib.HTTPConnection'
+SPAWN = 'eventlet.GreenPool.spawn_n'
 
 
 class BigSwitchTestBase(object):
@@ -48,8 +49,10 @@ class BigSwitchTestBase(object):
                                     new=fake_server.HTTPConnectionMock)
         self.plugin_notifier_p = mock.patch(NOTIFIER)
         self.callbacks_p = mock.patch(CALLBACKS)
+        self.spawn_p = mock.patch(SPAWN)
         self.addCleanup(mock.patch.stopall)
         self.addCleanup(db.clear_db)
         self.callbacks_p.start()
         self.plugin_notifier_p.start()
         self.httpPatch.start()
+        self.spawn_p.start()
index 831cccb0b0662de056acd5e85cdc8be6ae3f4bd0..69fb5b7b01f95b999f8dacdaa4b46a467b96b410 100644 (file)
@@ -39,6 +39,7 @@ class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
             self._plugin_name = plugin_name
         super(BigSwitchProxyPluginV2TestCase,
               self).setUp(self._plugin_name)
+        self.port_create_status = 'BUILD'
 
 
 class TestBigSwitchProxyBasicGet(test_plugin.TestBasicGet,
@@ -67,25 +68,31 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
     VIF_TYPE = portbindings.VIF_TYPE_OVS
     HAS_PORT_FILTER = False
 
+    def test_update_port_status_build(self):
+        with self.port() as port:
+            self.assertEqual(port['port']['status'], 'BUILD')
+            self.assertEqual(self.port_create_status, 'BUILD')
+
     def _get_ports(self, netid):
         return self.deserialize('json',
                                 self._list_ports('json', netid=netid))['ports']
 
     def test_rollback_for_port_create(self):
-        with self.network(no_delete=True) as n:
+        plugin = NeutronManager.get_plugin()
+        with self.subnet() as s:
             self.httpPatch = patch('httplib.HTTPConnection', create=True,
                                    new=fake_server.HTTPConnectionMock500)
             self.httpPatch.start()
-            kwargs = {'device_id': 'somedevid',
-                      'tenant_id': n['network']['tenant_id']}
-            self._create_port('json', n['network']['id'],
-                              expected_code=
-                              webob.exc.HTTPInternalServerError.code,
-                              **kwargs)
-            self.httpPatch.stop()
-            ports = self._get_ports(n['network']['id'])
-            #failure to create should result in no ports
-            self.assertEqual(0, len(ports))
+            kwargs = {'device_id': 'somedevid'}
+            # allow thread spawns for this patch
+            self.spawn_p.stop()
+            with self.port(subnet=s, **kwargs):
+                self.spawn_p.start()
+                plugin.evpool.waitall()
+                self.httpPatch.stop()
+                ports = self._get_ports(s['subnet']['network_id'])
+                #failure to create should result in port in error state
+                self.assertEqual(ports[0]['status'], 'ERROR')
 
     def test_rollback_for_port_update(self):
         with self.network() as n:
@@ -116,7 +123,7 @@ class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
                              webob.exc.HTTPInternalServerError.code)
                 self.httpPatch.stop()
                 port = self._get_ports(n['network']['id'])[0]
-                self.assertEqual('ACTIVE', port['status'])
+                self.assertEqual('BUILD', port['status'])
 
     def test_correct_shared_net_tenant_id(self):
         # tenant_id in port requests should match network tenant_id instead