# child processes as workers. The parent process manages them.
# api_workers = 0
+# Number of separate RPC worker processes to spawn. The default, 0, runs the
+# worker thread in the current process. Greater than 0 launches that number of
+# child processes as RPC workers. The parent process manages them.
+# rpc_workers = 0
+
# Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
# starting API server. Not supported on OS X.
# tcp_keepidle = 600
:param id: UUID representing the port to delete.
"""
pass
+
+ def start_rpc_listener(self):
+ """Start the rpc listener.
+
+ Most plugins start an RPC listener implicitly on initialization. In
+ order to support multiple process RPC, the plugin needs to expose
+ control over when this is started.
+
+ .. note:: this method is optional, as it was not part of the originally
+ defined plugin API.
+ """
+ raise NotImplementedError
ack_on_error)
def consume_in_thread(self):
- self.connection.consume_in_thread()
+ return self.connection.consume_in_thread()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance."""
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
+
+ def start_rpc_listener(self):
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.topic = topics.PLUGIN
self.conn = c_rpc.create_connection(new=True)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
- self.conn.consume_in_thread()
+ return self.conn.consume_in_thread()
def _process_provider_segment(self, segment):
network_type = self._get_attribute(segment, provider.NETWORK_TYPE)
from neutron import service
from neutron.openstack.common import gettextutils
+from neutron.openstack.common import log as logging
gettextutils.install('neutron', lazy=True)
+LOG = logging.getLogger(__name__)
+
def main():
eventlet.monkey_patch()
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
" the '--config-file' option!"))
try:
- neutron_service = service.serve_wsgi(service.NeutronApiService)
- neutron_service.wait()
+ pool = eventlet.GreenPool()
+
+ neutron_api = service.serve_wsgi(service.NeutronApiService)
+ api_thread = pool.spawn(neutron_api.wait)
+
+ try:
+ neutron_rpc = service.serve_rpc()
+ except NotImplementedError:
+ LOG.info(_("RPC was already started in parent process by plugin."))
+ else:
+ rpc_thread = pool.spawn(neutron_rpc.wait)
+
+ # api and rpc should die together. When one dies, kill the other.
+ rpc_thread.link(lambda gt: api_thread.kill())
+ api_thread.link(lambda gt: rpc_thread.kill())
+
+ pool.waitall()
+ except KeyboardInterrupt:
+ pass
except RuntimeError as e:
sys.exit(_("ERROR: %s") % e)
# License for the specific language governing permissions and limitations
# under the License.
+import eventlet
import inspect
import logging as std_logging
import os
from neutron.common import config
from neutron.common import legacy
from neutron import context
+from neutron import manager
+from neutron import neutron_plugin_base_v2
+from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import service
+from neutron.openstack.common.service import ProcessLauncher
from neutron import wsgi
cfg.IntOpt('api_workers',
default=0,
help=_('Number of separate worker processes for service')),
+ cfg.IntOpt('rpc_workers',
+ default=0,
+ help=_('Number of RPC worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
return service
+class RpcWorker(object):
+ """Wraps a worker to be handled by ProcessLauncher"""
+ def __init__(self, plugin):
+ self._plugin = plugin
+ self._server = None
+
+ def start(self):
+ # We may have just forked from parent process. A quick disposal of the
+ # existing sql connections avoids producing errors later when they are
+ # discovered to be broken.
+ session.get_engine(sqlite_fk=True).pool.dispose()
+ self._server = self._plugin.start_rpc_listener()
+
+ def wait(self):
+ if isinstance(self._server, eventlet.greenthread.GreenThread):
+ self._server.wait()
+
+ def stop(self):
+ if isinstance(self._server, eventlet.greenthread.GreenThread):
+ self._server.kill()
+ self._server = None
+
+
+def serve_rpc():
+ plugin = manager.NeutronManager.get_plugin()
+
+ # If 0 < rpc_workers then start_rpc_listener would be called in a
+ # subprocess and we cannot simply catch the NotImplementedError. It is
+ # simpler to check this up front by testing whether the plugin overrides
+ # start_rpc_listener.
+ base = neutron_plugin_base_v2.NeutronPluginBaseV2
+ if plugin.__class__.start_rpc_listener == base.start_rpc_listener:
+ LOG.debug(_("Active plugin doesn't implement start_rpc_listener"))
+ if 0 < cfg.CONF.rpc_workers:
+ msg = _("'rpc_workers = %d' ignored because start_rpc_listener "
+ "is not implemented.")
+ LOG.error(msg, cfg.CONF.rpc_workers)
+ raise NotImplementedError
+
+ try:
+ rpc = RpcWorker(plugin)
+
+ if cfg.CONF.rpc_workers < 1:
+ rpc.start()
+ return rpc
+ else:
+ launcher = ProcessLauncher(wait_interval=1.0)
+ launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
+ return launcher
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_('Unrecoverable error: please check log '
+ 'for details.'))
+
+
def _run_wsgi(app_name):
app = config.load_paste_app(app_name)
if not app:
super(PortBindingTestCase, self).setUp(PLUGIN_NAME)
self.port_create_status = 'DOWN'
self.plugin = manager.NeutronManager.get_plugin()
+ self.plugin.start_rpc_listener()
def _check_response(self, port, vif_type, has_port_filter, bound):
self.assertEqual(port[portbindings.VIF_TYPE], vif_type)
class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
+ def setUp(self):
+ super(TestMl2SecurityGroups, self).setUp()
+ plugin = manager.NeutronManager.get_plugin()
+ plugin.start_rpc_listener()
+
def test_security_group_get_port_from_device(self):
with self.network() as n:
with self.subnet(n):