# server_auth : <username:password> (default: no auth)
# server_ssl : True | False (default: False)
# sync_data : True | False (default: False)
-# server_timeout : 10 (default: 10 seconds)
+# server_timeout : <int> (default: 10 seconds)
# neutron_id : <string> (default: neutron-<hostname>)
# add_meta_server_route : True | False (default: True)
-#
+# thread_pool_size : <int> (default: 4)
# A comma separated list of BigSwitch or Floodlight servers and port numbers. The plugin proxies the requests to the BigSwitch/Floodlight server, which performs the networking configuration. Note that only one server is needed per deployment, but you may wish to deploy multiple servers to support failover.
servers=localhost:8080
# Flag to decide if a route to the metadata server should be injected into the VM
# add_meta_server_route = True
+# Number of threads to use to handle large volumes of port creation requests
+# thread_pool_size = 4
+
[nova]
# Specify the VIF_TYPE that will be controlled on the Nova compute instances
# options: ivs or ovs
cfg.IntOpt('server_timeout', default=10,
help=_("Maximum number of seconds to wait for proxy request "
"to connect and complete.")),
+ cfg.IntOpt('thread_pool_size', default=4,
+ help=_("Maximum number of threads to spawn to handle large "
+ "volumes of port creations.")),
cfg.StrOpt('neutron_id', default='neutron-' + utils.get_hostname(),
deprecated_name='quantum_id',
help=_("User defined identifier for this Neutron deployment")),
import copy
import re
+import eventlet
from oslo.config import cfg
+from sqlalchemy.orm import exc as sqlexc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api import extensions as neutron_extensions
LOG = logging.getLogger(__name__)
-
SYNTAX_ERROR_MESSAGE = _('Syntax error in server config file, aborting plugin')
METADATA_SERVER_IP = '169.254.169.254'
self).get_network(context, port["network_id"])
return net['tenant_id']
+ def async_port_create(self, tenant_id, net_id, port):
+ try:
+ self.servers.rest_create_port(tenant_id, net_id, port)
+ except servermanager.RemoteRestError as e:
+ LOG.error(
+ _("NeutronRestProxyV2: Unable to create port: %s"), e)
+ try:
+ self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
+ except exceptions.PortNotFound:
+ # If port is already gone from DB and there was an error
+ # creating on the backend, everything is already consistent
+ pass
+ return
+ new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
+ else const.PORT_STATUS_DOWN)
+ try:
+ self._set_port_status(port['id'], new_status)
+ except exceptions.PortNotFound:
+ # This port was deleted before the create made it to the controller
+ # so it now needs to be deleted since the normal delete request
+ # would have deleted an non-existent port.
+ self.servers.rest_delete_port(tenant_id, net_id, port['id'])
+
+ def _set_port_status(self, port_id, status):
+ session = db.get_session()
+ try:
+ port = session.query(models_v2.Port).filter_by(id=port_id).one()
+ port['status'] = status
+ session.flush()
+ except sqlexc.NoResultFound:
+ raise exceptions.PortNotFound(port_id=port_id)
+
class NeutronRestProxyV2(NeutronRestProxyV2Base,
extradhcpopt_db.ExtraDhcpOptMixin,
LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'),
version_string_with_vcs())
pl_config.register_config()
+ self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
# Include the BigSwitch Extensions path in the api_extensions
neutron_extensions.append_api_extensions_path(extensions.__path__)
with context.session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port)
+ # set port status to pending. updated after rest call completes
+ port['port']['status'] = const.PORT_STATUS_BUILD
dhcp_opts = port['port'].get(edo_ext.EXTRADHCPOPTS, [])
new_port = super(NeutronRestProxyV2, self).create_port(context,
port)
- if (portbindings.HOST_ID in port['port']
- and 'id' in new_port):
- host_id = port['port'][portbindings.HOST_ID]
- porttracker_db.put_port_hostid(context, new_port['id'],
- host_id)
- self._process_port_create_extra_dhcp_opts(context, new_port,
- dhcp_opts)
- new_port = self._extend_port_dict_binding(context, new_port)
- net_tenant_id = self._get_port_net_tenantid(context, new_port)
- if self.add_meta_server_route:
- if new_port['device_owner'] == 'network:dhcp':
- destination = METADATA_SERVER_IP + '/32'
- self._add_host_route(context, destination, new_port)
-
- # create on network ctrl
- mapped_port = self._map_state_and_status(new_port)
- self.servers.rest_create_port(net_tenant_id,
- new_port["network_id"],
- mapped_port)
self._process_port_create_security_group(context, new_port, sgids)
+ if (portbindings.HOST_ID in port['port']
+ and 'id' in new_port):
+ host_id = port['port'][portbindings.HOST_ID]
+ porttracker_db.put_port_hostid(context, new_port['id'],
+ host_id)
+ self._process_port_create_extra_dhcp_opts(context, new_port,
+ dhcp_opts)
+ new_port = self._extend_port_dict_binding(context, new_port)
+ net = super(NeutronRestProxyV2,
+ self).get_network(context, new_port["network_id"])
+ if self.add_meta_server_route:
+ if new_port['device_owner'] == 'network:dhcp':
+ destination = METADATA_SERVER_IP + '/32'
+ self._add_host_route(context, destination, new_port)
+
+ # create on network ctrl
+ mapped_port = self._map_state_and_status(new_port)
+ self.evpool.spawn_n(self.async_port_create, net["tenant_id"],
+ new_port["network_id"], mapped_port)
self.notify_security_groups_member_updated(context, new_port)
return new_port
# under the License.
#
# @author: Sumit Naiksatam, sumitnaiksatam@gmail.com, Big Switch Networks, Inc.
-
+import eventlet
from oslo.config import cfg
from neutron import context as ctx
# register plugin config opts
pl_config.register_config()
+ self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
# backend doesn't support bulk operations yet
self.native_bulk_support = False
# create port on the network controller
port = self._prepare_port_for_controller(context)
if port:
- self.servers.rest_create_port(port["network"]["tenant_id"],
- port["network"]["id"], port)
+ self.async_port_create(port["network"]["tenant_id"],
+ port["network"]["id"], port)
def update_port_postcommit(self, context):
# update port on the network controller
CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
HTTPCON = 'httplib.HTTPConnection'
+SPAWN = 'eventlet.GreenPool.spawn_n'
class BigSwitchTestBase(object):
new=fake_server.HTTPConnectionMock)
self.plugin_notifier_p = mock.patch(NOTIFIER)
self.callbacks_p = mock.patch(CALLBACKS)
+ self.spawn_p = mock.patch(SPAWN)
self.addCleanup(mock.patch.stopall)
self.addCleanup(db.clear_db)
self.callbacks_p.start()
self.plugin_notifier_p.start()
self.httpPatch.start()
+ self.spawn_p.start()
self._plugin_name = plugin_name
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name)
+ self.port_create_status = 'BUILD'
class TestBigSwitchProxyBasicGet(test_plugin.TestBasicGet,
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = False
+ def test_update_port_status_build(self):
+ with self.port() as port:
+ self.assertEqual(port['port']['status'], 'BUILD')
+ self.assertEqual(self.port_create_status, 'BUILD')
+
def _get_ports(self, netid):
return self.deserialize('json',
self._list_ports('json', netid=netid))['ports']
def test_rollback_for_port_create(self):
- with self.network(no_delete=True) as n:
+ plugin = NeutronManager.get_plugin()
+ with self.subnet() as s:
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock500)
self.httpPatch.start()
- kwargs = {'device_id': 'somedevid',
- 'tenant_id': n['network']['tenant_id']}
- self._create_port('json', n['network']['id'],
- expected_code=
- webob.exc.HTTPInternalServerError.code,
- **kwargs)
- self.httpPatch.stop()
- ports = self._get_ports(n['network']['id'])
- #failure to create should result in no ports
- self.assertEqual(0, len(ports))
+ kwargs = {'device_id': 'somedevid'}
+ # allow thread spawns for this patch
+ self.spawn_p.stop()
+ with self.port(subnet=s, **kwargs):
+ self.spawn_p.start()
+ plugin.evpool.waitall()
+ self.httpPatch.stop()
+ ports = self._get_ports(s['subnet']['network_id'])
+ #failure to create should result in port in error state
+ self.assertEqual(ports[0]['status'], 'ERROR')
def test_rollback_for_port_update(self):
with self.network() as n:
webob.exc.HTTPInternalServerError.code)
self.httpPatch.stop()
port = self._get_ports(n['network']['id'])[0]
- self.assertEqual('ACTIVE', port['status'])
+ self.assertEqual('BUILD', port['status'])
def test_correct_shared_net_tenant_id(self):
# tenant_id in port requests should match network tenant_id instead