From deb27d9c24d3745ff062bc437081bd67cf10059f Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Mon, 2 Jun 2014 17:40:38 +0200 Subject: [PATCH] Port to oslo.messaging Now that all preparations are done, actually port the code to use oslo.messaging. This patch does as little as possible. Follow up patches that refactor and cleanup the code and configuration files, will be merged later. The reason for this is to make the patch as slim as possible, to make review process more smooth and concentrated. Details: * neutron/common/rpc.py: - added init() and cleanup() to set global RPC layer state. - added utility functions: get_server(), get_client(), get_notifier() that wrap up oslo.messaging API a bit, enforcing eventlet executor and setting serializer, among other things. - removed PluginRpcDispatcher, instead introduced PluginRpcSerializer to use as a default serializer for API callbacks. * neutron/common/rpc_compat.py: - emulated incubator RPC layer behaviour thru previously introduced stub classes (RpcCallback, RpcProxy, ...) using new oslo.messaging API. - switched to using new oslo.messaging exception types. * neutron/service.py: - expect multiple RPC listeners that are of MessageHandlingServer type, not GreenThread. * neutron/common/config.py: - initialize RPC layer in init() * setup.cfg: - added entry points for old notifier drivers to retain backward compatibility. * neutron/tests/...: - introduced fake_notifier to replace impl_fake. - faked out consume_in_thread() to avoid starting RPC listeners when running unit tests. - used 'fake' transport driver. - made sure neutron.test.* exceptions are caught. - initialize and clean up RPC layer for each test case. * Ported all affected code from using neutron.openstack.common.notifier API to oslo.messaging.Notifier. * rpc.set_defaults() was renamed to rpc.set_transport_defaults() * other changes not worth mentioning here. blueprint oslo-messaging DocImpact Change-Id: I5a91c34df6e300f2dc46217b1b16352fcc3039fc --- neutron/api/v2/base.py | 30 ++--- neutron/cmd/usage_audit.py | 33 ++--- neutron/common/config.py | 9 +- neutron/common/rpc.py | 113 ++++++++++++++++-- neutron/common/rpc_compat.py | 84 +++++++++++-- neutron/db/l3_db.py | 18 ++- neutron/db/metering/metering_rpc.py | 3 +- neutron/openstack/common/service.py | 4 +- .../bigswitch/agent/restproxy_agent.py | 3 +- neutron/plugins/bigswitch/plugin.py | 4 +- neutron/plugins/brocade/NeutronPlugin.py | 4 +- .../plugins/cisco/n1kv/n1kv_neutron_plugin.py | 4 +- .../hyperv/agent/hyperv_neutron_agent.py | 6 +- neutron/plugins/hyperv/rpc_callbacks.py | 4 +- .../plugins/ibm/agent/sdnve_neutron_agent.py | 3 +- neutron/plugins/ibm/sdnve_neutron_plugin.py | 4 +- .../agent/linuxbridge_neutron_agent.py | 3 +- .../plugins/linuxbridge/lb_neutron_plugin.py | 4 +- neutron/plugins/midonet/plugin.py | 4 +- neutron/plugins/ml2/rpc.py | 12 +- .../mlnx/agent/eswitch_neutron_agent.py | 3 +- neutron/plugins/mlnx/rpc_callbacks.py | 4 +- .../plugins/nec/agent/nec_neutron_agent.py | 4 +- neutron/plugins/nec/common/config.py | 1 - neutron/plugins/nec/nec_plugin.py | 15 ++- .../ofagent/agent/ofa_neutron_agent.py | 3 +- .../agent/nvsd_neutron_agent.py | 4 +- neutron/plugins/oneconvergence/plugin.py | 4 +- .../openvswitch/agent/ovs_neutron_agent.py | 3 +- .../plugins/openvswitch/ovs_neutron_plugin.py | 4 +- .../plugins/ryu/agent/ryu_neutron_agent.py | 3 +- neutron/plugins/ryu/ryu_neutron_plugin.py | 3 +- neutron/plugins/vmware/dhcp_meta/rpc.py | 4 +- neutron/policy.py | 4 +- neutron/service.py | 18 +-- neutron/services/firewall/fwaas_plugin.py | 3 +- .../services/l3_router/l3_router_plugin.py | 3 +- .../drivers/common/agent_driver_base.py | 4 +- .../metering/agents/metering_agent.py | 9 +- .../vpn/device_drivers/cisco_ipsec.py | 11 +- neutron/services/vpn/device_drivers/ipsec.py | 10 +- .../vpn/service_drivers/cisco_ipsec.py | 3 +- neutron/services/vpn/service_drivers/ipsec.py | 3 +- neutron/tests/base.py | 46 ++++--- neutron/tests/fake_notifier.py | 50 ++++++++ .../tests/unit/hyperv/test_hyperv_rpcapi.py | 14 +-- neutron/tests/unit/linuxbridge/test_rpcapi.py | 11 +- neutron/tests/unit/ml2/test_rpcapi.py | 13 +- neutron/tests/unit/mlnx/test_rpcapi.py | 11 +- .../tests/unit/openvswitch/test_ovs_rpcapi.py | 11 +- .../services/metering/test_metering_agent.py | 6 +- neutron/tests/unit/test_agent_rpc.py | 2 +- neutron/tests/unit/test_api_v2.py | 68 +++++------ neutron/tests/unit/test_l3_plugin.py | 8 +- requirements.txt | 1 + setup.cfg | 8 ++ 56 files changed, 439 insertions(+), 282 deletions(-) create mode 100644 neutron/tests/fake_notifier.py diff --git a/neutron/api/v2/base.py b/neutron/api/v2/base.py index 2ed735e27..89ef47ed6 100644 --- a/neutron/api/v2/base.py +++ b/neutron/api/v2/base.py @@ -27,8 +27,8 @@ from neutron.api.v2 import attributes from neutron.api.v2 import resource as wsgi_resource from neutron.common import constants as const from neutron.common import exceptions +from neutron.common import rpc as n_rpc from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import api as notifier_api from neutron import policy from neutron import quota @@ -69,7 +69,7 @@ class Controller(object): self._native_sorting = self._is_native_sorting_supported() self._policy_attrs = [name for (name, info) in self._attr_info.items() if info.get('required_by_policy')] - self._publisher_id = notifier_api.publisher_id('network') + self._notifier = n_rpc.get_notifier('network') # use plugin's dhcp notifier, if this is already instantiated agent_notifiers = getattr(plugin, 'agent_notifiers', {}) self._dhcp_agent_notifier = ( @@ -372,10 +372,8 @@ class Controller(object): def create(self, request, body=None, **kwargs): """Creates a new instance of the requested entity.""" parent_id = kwargs.get(self._parent_id_name) - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.create.start', - notifier_api.CONF.default_notification_level, body) body = Controller.prepare_request_body(request.context, body, True, self._resource, self._attr_info, @@ -419,10 +417,8 @@ class Controller(object): def notify(create_result): notifier_method = self._resource + '.create.end' - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, notifier_method, - notifier_api.CONF.default_notification_level, create_result) self._send_dhcp_notification(request.context, create_result, @@ -458,10 +454,8 @@ class Controller(object): def delete(self, request, id, **kwargs): """Deletes the specified entity.""" - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.delete.start', - notifier_api.CONF.default_notification_level, {self._resource + '_id': id}) action = self._plugin_handlers[self.DELETE] @@ -482,10 +476,8 @@ class Controller(object): obj_deleter = getattr(self._plugin, action) obj_deleter(request.context, id, **kwargs) notifier_method = self._resource + '.delete.end' - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, notifier_method, - notifier_api.CONF.default_notification_level, {self._resource + '_id': id}) result = {self._resource: self._view(request.context, obj)} self._send_nova_notification(action, {}, result) @@ -502,10 +494,8 @@ class Controller(object): msg = _("Invalid format: %s") % request.body raise exceptions.BadRequest(resource='body', msg=msg) payload['id'] = id - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.update.start', - notifier_api.CONF.default_notification_level, payload) body = Controller.prepare_request_body(request.context, body, False, self._resource, self._attr_info, @@ -541,11 +531,7 @@ class Controller(object): obj = obj_updater(request.context, id, **kwargs) result = {self._resource: self._view(request.context, obj)} notifier_method = self._resource + '.update.end' - notifier_api.notify(request.context, - self._publisher_id, - notifier_method, - notifier_api.CONF.default_notification_level, - result) + self._notifier.info(request.context, notifier_method, result) self._send_dhcp_notification(request.context, result, notifier_method) diff --git a/neutron/cmd/usage_audit.py b/neutron/cmd/usage_audit.py index f48e0c691..6294d710d 100644 --- a/neutron/cmd/usage_audit.py +++ b/neutron/cmd/usage_audit.py @@ -26,9 +26,9 @@ import sys from oslo.config import cfg from neutron.common import config +from neutron.common import rpc as n_rpc from neutron import context from neutron import manager -from neutron.openstack.common.notifier import api as notifier_api def main(): @@ -37,33 +37,14 @@ def main(): cxt = context.get_admin_context() plugin = manager.NeutronManager.get_plugin() + notifier = n_rpc.get_notifier('network') for network in plugin.get_networks(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'network.exists', - notifier_api.INFO, - {'network': network}) + notifier.info(cxt, 'network.exists', {'network': network}) for subnet in plugin.get_subnets(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'subnet.exists', - notifier_api.INFO, - {'subnet': subnet}) + notifier.info(cxt, 'subnet.exists', {'subnet': subnet}) for port in plugin.get_ports(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'port.exists', - notifier_api.INFO, - {'port': port}) + notifier.info(cxt, 'port.exists', {'port': port}) for router in plugin.get_routers(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'router.exists', - notifier_api.INFO, - {'router': router}) + notifier.info(cxt, 'router.exists', {'router': router}) for floatingip in plugin.get_floatingips(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'floatingip.exists', - notifier_api.INFO, - {'floatingip': floatingip}) + notifier.info(cxt, 'floatingip.exists', {'floatingip': floatingip}) diff --git a/neutron/common/config.py b/neutron/common/config.py index a7b7a9559..0a8232fa0 100644 --- a/neutron/common/config.py +++ b/neutron/common/config.py @@ -20,13 +20,13 @@ Routines for configuring Neutron import os from oslo.config import cfg +from oslo import messaging from paste import deploy from neutron.api.v2 import attributes from neutron.common import utils from neutron.openstack.common.db import options as db_options from neutron.openstack.common import log as logging -from neutron.openstack.common import rpc from neutron import version @@ -125,7 +125,7 @@ cfg.CONF.register_opts(core_opts) cfg.CONF.register_cli_opts(core_cli_opts) # Ensure that the control exchange is set correctly -rpc.set_defaults(control_exchange='neutron') +messaging.set_transport_defaults(control_exchange='neutron') _SQL_CONNECTION_DEFAULT = 'sqlite://' # Update the default QueuePool parameters. These can be tweaked by the # configuration variables - max_pool_size, max_overflow and pool_timeout @@ -139,6 +139,11 @@ def init(args, **kwargs): version='%%prog %s' % version.version_info.release_string(), **kwargs) + # FIXME(ihrachys): if import is put in global, circular import + # failure occurs + from neutron.common import rpc as n_rpc + n_rpc.init(cfg.CONF) + # Validate that the base_mac is of the correct format msg = attributes._validate_regex(cfg.CONF.base_mac, attributes.MAC_PATTERN) diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index 643cf5934..98d468140 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -15,31 +15,122 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg +from oslo import messaging +from oslo.messaging import serializer as om_serializer + +from neutron.common import exceptions from neutron import context from neutron.openstack.common import log as logging -from neutron.openstack.common.rpc import dispatcher LOG = logging.getLogger(__name__) -class PluginRpcDispatcher(dispatcher.RpcDispatcher): - """This class is used to convert RPC common context into +TRANSPORT = None +NOTIFIER = None + +ALLOWED_EXMODS = [ + exceptions.__name__, +] +EXTRA_EXMODS = [] + + +TRANSPORT_ALIASES = { + 'neutron.openstack.common.rpc.impl_fake': 'fake', + 'neutron.openstack.common.rpc.impl_qpid': 'qpid', + 'neutron.openstack.common.rpc.impl_kombu': 'rabbit', + 'neutron.openstack.common.rpc.impl_zmq': 'zmq', + 'neutron.rpc.impl_fake': 'fake', + 'neutron.rpc.impl_qpid': 'qpid', + 'neutron.rpc.impl_kombu': 'rabbit', + 'neutron.rpc.impl_zmq': 'zmq', +} + + +def init(conf): + global TRANSPORT, NOTIFIER + exmods = get_allowed_exmods() + TRANSPORT = messaging.get_transport(conf, + allowed_remote_exmods=exmods, + aliases=TRANSPORT_ALIASES) + NOTIFIER = messaging.Notifier(TRANSPORT) + + +def cleanup(): + global TRANSPORT, NOTIFIER + assert TRANSPORT is not None + assert NOTIFIER is not None + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def add_extra_exmods(*args): + EXTRA_EXMODS.extend(args) + + +def clear_extra_exmods(): + del EXTRA_EXMODS[:] + + +def get_allowed_exmods(): + return ALLOWED_EXMODS + EXTRA_EXMODS + + +def get_client(target, version_cap=None, serializer=None): + assert TRANSPORT is not None + serializer = PluginRpcSerializer(serializer) + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + assert TRANSPORT is not None + serializer = PluginRpcSerializer(serializer) + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer) + + +def get_notifier(service=None, host=None, publisher_id=None): + assert NOTIFIER is not None + if not publisher_id: + publisher_id = "%s.%s" % (service, host or cfg.CONF.host) + return NOTIFIER.prepare(publisher_id=publisher_id) + + +class PluginRpcSerializer(om_serializer.Serializer): + """This serializer is used to convert RPC common context into Neutron Context. """ + def __init__(self, base): + super(PluginRpcSerializer, self).__init__() + self._base = base + + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) + + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) - def __init__(self, callbacks): - super(PluginRpcDispatcher, self).__init__(callbacks) + def serialize_context(self, ctxt): + return ctxt.to_dict() - def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs): - rpc_ctxt_dict = rpc_ctxt.to_dict() + def deserialize_context(self, ctxt): + rpc_ctxt_dict = ctxt.copy() user_id = rpc_ctxt_dict.pop('user_id', None) if not user_id: user_id = rpc_ctxt_dict.pop('user', None) tenant_id = rpc_ctxt_dict.pop('tenant_id', None) if not tenant_id: tenant_id = rpc_ctxt_dict.pop('project_id', None) - neutron_ctxt = context.Context(user_id, tenant_id, - load_admin_roles=False, **rpc_ctxt_dict) - return super(PluginRpcDispatcher, self).dispatch( - neutron_ctxt, version, method, namespace, **kwargs) + return context.Context(user_id, tenant_id, + load_admin_roles=False, **rpc_ctxt_dict) diff --git a/neutron/common/rpc_compat.py b/neutron/common/rpc_compat.py index f494d5338..939551d49 100644 --- a/neutron/common/rpc_compat.py +++ b/neutron/common/rpc_compat.py @@ -13,24 +13,63 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg +from oslo import messaging + +from neutron.common import rpc as n_rpc from neutron.openstack.common import log as logging -from neutron.openstack.common import rpc -from neutron.openstack.common.rpc import common as rpc_common -from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher -from neutron.openstack.common.rpc import proxy from neutron.openstack.common import service LOG = logging.getLogger(__name__) -class RpcProxy(proxy.RpcProxy): +class RpcProxy(object): ''' This class is created to facilitate migration from oslo-incubator RPC layer implementation to oslo.messaging and is intended to emulate RpcProxy class behaviour using oslo.messaging API once the migration is applied. ''' + RPC_API_NAMESPACE = None + + def __init__(self, topic, default_version, version_cap=None): + self.topic = topic + target = messaging.Target(topic=topic, version=default_version) + self._client = n_rpc.get_client(target, version_cap=version_cap) + + def make_msg(self, method, **kwargs): + return {'method': method, + 'namespace': self.RPC_API_NAMESPACE, + 'args': kwargs} + + def call(self, context, msg, **kwargs): + return self.__call_rpc_method( + context, msg, rpc_method='call', **kwargs) + + def cast(self, context, msg, **kwargs): + self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs) + + def fanout_cast(self, context, msg, **kwargs): + kwargs['fanout'] = True + self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs) + + def __call_rpc_method(self, context, msg, **kwargs): + options = dict( + ((opt, kwargs[opt]) + for opt in ('fanout', 'timeout', 'topic', 'version') + if kwargs.get(opt)) + ) + if msg['namespace']: + options['namespace'] = msg['namespace'] + + if options: + callee = self._client.prepare(**options) + else: + callee = self._client + + func = getattr(callee, kwargs['rpc_method']) + return func(context, msg['method'], **msg['args']) class RpcCallback(object): @@ -40,6 +79,11 @@ class RpcCallback(object): callback version using oslo.messaging API once the migration is applied. ''' + RPC_API_VERSION = '1.0' + + def __init__(self): + super(RpcCallback, self).__init__() + self.target = messaging.Target(version=self.RPC_API_VERSION) class Service(service.Service): @@ -64,8 +108,7 @@ class Service(service.Service): LOG.debug("Creating Consumer connection for Service %s" % self.topic) - dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], - self.serializer) + dispatcher = [self.manager] # Share this same connection for these Consumers self.conn.create_consumer(self.topic, dispatcher, fanout=False) @@ -93,11 +136,30 @@ class Service(service.Service): super(Service, self).stop() +class Connection(object): + + def __init__(self): + super(Connection, self).__init__() + self.servers = [] + + def create_consumer(self, topic, proxy, fanout=False): + target = messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=fanout) + server = n_rpc.get_server(target, proxy) + self.servers.append(server) + + def consume_in_thread(self): + for server in self.servers: + server.start() + return self.servers + + # functions -create_connection = rpc.create_connection +def create_connection(new=True): + return Connection() # exceptions -RPCException = rpc_common.RPCException -RemoteError = rpc_common.RemoteError -MessagingTimeout = rpc_common.Timeout +RPCException = messaging.MessagingException +RemoteError = messaging.RemoteError +MessagingTimeout = messaging.MessagingTimeout diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 547026277..5d2aa6e1a 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as l3_constants from neutron.common import exceptions as n_exc +from neutron.common import rpc as n_rpc from neutron.common import utils from neutron.db import model_base from neutron.db import models_v2 @@ -28,7 +29,6 @@ from neutron.extensions import external_net from neutron.extensions import l3 from neutron import manager from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import api as notifier_api from neutron.openstack.common import uuidutils from neutron.plugins.common import constants @@ -481,11 +481,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'tenant_id': port['tenant_id'], 'port_id': port['id'], 'subnet_id': port['fixed_ips'][0]['subnet_id']} - notifier_api.notify(context, - notifier_api.publisher_id('network'), - 'router.interface.create', - notifier_api.CONF.default_notification_level, - {'router_interface': info}) + notifier = n_rpc.get_notifier('network') + notifier.info( + context, 'router.interface.create', {'router_interface': info}) return info def _confirm_router_interface_not_in_use(self, context, router_id, @@ -560,11 +558,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'tenant_id': port['tenant_id'], 'port_id': port['id'], 'subnet_id': subnet['id']} - notifier_api.notify(context, - notifier_api.publisher_id('network'), - 'router.interface.delete', - notifier_api.CONF.default_notification_level, - {'router_interface': info}) + notifier = n_rpc.get_notifier('network') + notifier.info( + context, 'router.interface.delete', {'router_interface': info}) return info def _get_floatingip(self, context, id): diff --git a/neutron/db/metering/metering_rpc.py b/neutron/db/metering/metering_rpc.py index 82e7d3dd1..b55a0cf4c 100644 --- a/neutron/db/metering/metering_rpc.py +++ b/neutron/db/metering/metering_rpc.py @@ -15,7 +15,6 @@ # under the License. from neutron.common import constants as consts -from neutron.common import rpc as p_rpc from neutron.common import utils from neutron import manager from neutron.openstack.common import log as logging @@ -32,7 +31,7 @@ class MeteringRpcCallbacks(object): self.meter_plugin = meter_plugin def create_rpc_dispatcher(self): - return p_rpc.PluginRpcDispatcher([self]) + return [self] def get_sync_data_metering(self, context, **kwargs): l3_plugin = manager.NeutronManager.get_service_plugins().get( diff --git a/neutron/openstack/common/service.py b/neutron/openstack/common/service.py index 79ae9bc5d..4575de4b4 100644 --- a/neutron/openstack/common/service.py +++ b/neutron/openstack/common/service.py @@ -45,7 +45,9 @@ from neutron.openstack.common import systemd from neutron.openstack.common import threadgroup -rpc = importutils.try_import('neutron.openstack.common.rpc') +#rpc = importutils.try_import('neutron.openstack.common.rpc') +# TODO(ihrachys): restore once oslo-rpc code is removed from the tree +rpc = None CONF = cfg.CONF LOG = logging.getLogger(__name__) diff --git a/neutron/plugins/bigswitch/agent/restproxy_agent.py b/neutron/plugins/bigswitch/agent/restproxy_agent.py index a9c1e6653..6cdf5913b 100644 --- a/neutron/plugins/bigswitch/agent/restproxy_agent.py +++ b/neutron/plugins/bigswitch/agent/restproxy_agent.py @@ -36,7 +36,6 @@ from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import excutils from neutron.openstack.common import log -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.bigswitch import config as pl_config LOG = log.getLogger(__name__) @@ -106,7 +105,7 @@ class RestProxyAgent(rpc_compat.RpcCallback, self.topic = topics.AGENT self.plugin_rpc = PluginApi(topics.PLUGIN) self.context = q_context.get_admin_context_without_session() - self.dispatcher = dispatcher.RpcDispatcher([self]) + self.dispatcher = [self] consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] self.connection = agent_rpc.create_consumers(self.dispatcher, diff --git a/neutron/plugins/bigswitch/plugin.py b/neutron/plugins/bigswitch/plugin.py index 9249f5d6b..712f02b3c 100644 --- a/neutron/plugins/bigswitch/plugin.py +++ b/neutron/plugins/bigswitch/plugin.py @@ -57,7 +57,6 @@ from neutron.api import extensions as neutron_extensions from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.common import constants as const from neutron.common import exceptions -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -121,8 +120,7 @@ class RestProxyCallbacks(rpc_compat.RpcCallback, RPC_API_VERSION = '1.1' def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def get_port_from_device(self, device): port_id = re.sub(r"^tap", "", device) diff --git a/neutron/plugins/brocade/NeutronPlugin.py b/neutron/plugins/brocade/NeutronPlugin.py index fc1d1ad5d..5ec3fb401 100644 --- a/neutron/plugins/brocade/NeutronPlugin.py +++ b/neutron/plugins/brocade/NeutronPlugin.py @@ -31,7 +31,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -98,8 +97,7 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py index d3749f19d..e5c701e7d 100644 --- a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py +++ b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py @@ -28,7 +28,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -75,8 +74,7 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, diff --git a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py index 344735625..f76f751f8 100644 --- a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py +++ b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py @@ -38,7 +38,6 @@ from neutron.common import topics from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.hyperv.agent import utils from neutron.plugins.hyperv.agent import utilsfactory @@ -106,8 +105,7 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback, consumers) def _create_rpc_dispatcher(self): - rpc_callback = HyperVSecurityCallbackMixin(self) - return dispatcher.RpcDispatcher([rpc_callback]) + return [HyperVSecurityCallbackMixin(self)] class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback, @@ -236,7 +234,7 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback): segmentation_id, port['admin_state_up']) def _create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def _get_vswitch_name(self, network_type, physical_network): if network_type != p_const.TYPE_LOCAL: diff --git a/neutron/plugins/hyperv/rpc_callbacks.py b/neutron/plugins/hyperv/rpc_callbacks.py index dafc160e7..e967286d5 100644 --- a/neutron/plugins/hyperv/rpc_callbacks.py +++ b/neutron/plugins/hyperv/rpc_callbacks.py @@ -17,7 +17,6 @@ # @author: Alessandro Pilotti, Cloudbase Solutions Srl from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import dhcp_rpc_base @@ -48,8 +47,7 @@ class HyperVRpcCallbacks( If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def get_device_details(self, rpc_context, **kwargs): """Agent requests device details.""" diff --git a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py index 1a5190d90..b1fa1e8b6 100644 --- a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py +++ b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py @@ -37,7 +37,6 @@ from neutron.common import utils as n_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.ibm.common import config # noqa from neutron.plugins.ibm.common import constants @@ -156,7 +155,7 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback): "out-of-band") def create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def setup_integration_br(self, bridge_name, reset_br, out_of_band, controller_ip=None): diff --git a/neutron/plugins/ibm/sdnve_neutron_plugin.py b/neutron/plugins/ibm/sdnve_neutron_plugin.py index d3be17e51..8a6615f2e 100644 --- a/neutron/plugins/ibm/sdnve_neutron_plugin.py +++ b/neutron/plugins/ibm/sdnve_neutron_plugin.py @@ -23,7 +23,6 @@ from oslo.config import cfg from neutron.common import constants as n_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -54,8 +53,7 @@ class SdnveRpcCallbacks(): If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def sdnve_info(self, rpc_context, **kwargs): '''Update new information.''' diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index d586b2eb9..5af3f674a 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -45,7 +45,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.linuxbridge.common import config # noqa from neutron.plugins.linuxbridge.common import constants as lconst @@ -816,7 +815,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return dispatcher.RpcDispatcher([self]) + return [self] class LinuxBridgePluginApi(agent_rpc.PluginApi, diff --git a/neutron/plugins/linuxbridge/lb_neutron_plugin.py b/neutron/plugins/linuxbridge/lb_neutron_plugin.py index 9af9a616d..61089f63c 100644 --- a/neutron/plugins/linuxbridge/lb_neutron_plugin.py +++ b/neutron/plugins/linuxbridge/lb_neutron_plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -72,8 +71,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/midonet/plugin.py b/neutron/plugins/midonet/plugin.py index 3902278e4..4495dda01 100644 --- a/neutron/plugins/midonet/plugin.py +++ b/neutron/plugins/midonet/plugin.py @@ -29,7 +29,6 @@ from sqlalchemy.orm import exc as sa_exc from neutron.api.v2 import attributes from neutron.common import constants from neutron.common import exceptions as n_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -189,8 +188,7 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] class MidonetPluginException(n_exc.NeutronException): diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index ff4e6e7bc..e5068afb4 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -13,9 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo import messaging + from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -46,13 +47,15 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, # 1.0 Initial version (from openvswitch/linuxbridge) # 1.1 Support Security Group RPC + # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due to + # inheritance problems + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, notifier, type_manager): # REVISIT(kmestery): This depends on the first three super classes # not having their own __init__ functions. If an __init__() is added # to one, this could break. Fix this and add a unit test to cover this # test in H3. - # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due - # to inheritance problems super(RpcCallbacks, self).__init__(notifier, type_manager) def create_rpc_dispatcher(self): @@ -61,8 +64,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def _device_to_port_id(cls, device): diff --git a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py index 90a97ce44..94fd2b89a 100644 --- a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py +++ b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py @@ -35,7 +35,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.mlnx.agent import utils from neutron.plugins.mlnx.common import config # noqa @@ -218,7 +217,7 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback, or support more than one class as the target of rpc messages, override this method. """ - return dispatcher.RpcDispatcher([self]) + return [self] class MlnxEswitchPluginApi(agent_rpc.PluginApi, diff --git a/neutron/plugins/mlnx/rpc_callbacks.py b/neutron/plugins/mlnx/rpc_callbacks.py index fff970c43..0eda51436 100644 --- a/neutron/plugins/mlnx/rpc_callbacks.py +++ b/neutron/plugins/mlnx/rpc_callbacks.py @@ -17,7 +17,6 @@ from oslo.config import cfg from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import api as db_api @@ -48,8 +47,7 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback, or support more than one class as the target of RPC messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/nec/agent/nec_neutron_agent.py b/neutron/plugins/nec/agent/nec_neutron_agent.py index 38b13b5b7..c1f580ac2 100755 --- a/neutron/plugins/nec/agent/nec_neutron_agent.py +++ b/neutron/plugins/nec/agent/nec_neutron_agent.py @@ -38,7 +38,6 @@ from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.nec.common import config @@ -157,8 +156,7 @@ class NECNeutronAgent(object): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec, - self.callback_sg]) + self.dispatcher = [self.callback_nec, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] diff --git a/neutron/plugins/nec/common/config.py b/neutron/plugins/nec/common/config.py index ed35dcb17..70f4a1a63 100644 --- a/neutron/plugins/nec/common/config.py +++ b/neutron/plugins/nec/common/config.py @@ -18,7 +18,6 @@ from oslo.config import cfg from neutron.agent.common import config -from neutron.openstack.common import rpc # noqa from neutron.plugins.nec.common import constants as nconst diff --git a/neutron/plugins/nec/nec_plugin.py b/neutron/plugins/nec/nec_plugin.py index e36f9d63e..2bea5c04e 100644 --- a/neutron/plugins/nec/nec_plugin.py +++ b/neutron/plugins/nec/nec_plugin.py @@ -22,7 +22,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.v2 import attributes as attrs from neutron.common import constants as const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -147,12 +146,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2, # NOTE: callback_sg is referred to from the sg unit test. self.callback_sg = SecurityGroupServerRpcCallback() - callbacks = [NECPluginV2RPCCallbacks(self.safe_reference), - DhcpRpcCallback(), - L3RpcCallback(), - self.callback_sg, - agents_db.AgentExtRpcCallback()] - self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks) + self.dispatcher = [ + NECPluginV2RPCCallbacks(self.safe_reference), + DhcpRpcCallback(), + L3RpcCallback(), + self.callback_sg, + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) # Consume from all consumers in a thread @@ -722,7 +721,7 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback): If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self]) + return [self] def update_ports(self, rpc_context, **kwargs): """Update ports' information and activate/deavtivate them. diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index 7ff3040b0..c79d77a91 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -39,7 +39,6 @@ from neutron.common import utils as n_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.ofagent.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -351,7 +350,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return dispatcher.RpcDispatcher([self]) + return [self] def _provision_local_vlan_outbound_for_tunnel(self, lvid, segmentation_id, ofports): diff --git a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py index d1d3daf6e..0ef6348df 100644 --- a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py +++ b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py @@ -32,7 +32,6 @@ from neutron.common import topics from neutron import context as n_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.oneconvergence.lib import config LOG = logging.getLogger(__name__) @@ -120,8 +119,7 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = dispatcher.RpcDispatcher([self.callback_oc, - self.callback_sg]) + self.dispatcher = [self.callback_oc, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] diff --git a/neutron/plugins/oneconvergence/plugin.py b/neutron/plugins/oneconvergence/plugin.py index 7d7af13b0..732ead70a 100644 --- a/neutron/plugins/oneconvergence/plugin.py +++ b/neutron/plugins/oneconvergence/plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const from neutron.common import exceptions as nexception -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -61,8 +60,7 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback, def create_rpc_dispatcher(self): """Get the rpc dispatcher for this manager.""" - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @staticmethod def get_port_from_device(device): diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index e6a58567a..31c627484 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -41,7 +41,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -500,7 +499,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return dispatcher.RpcDispatcher([self]) + return [self] def provision_local_vlan(self, net_uuid, network_type, physical_network, segmentation_id): diff --git a/neutron/plugins/openvswitch/ovs_neutron_plugin.py b/neutron/plugins/openvswitch/ovs_neutron_plugin.py index 01867c416..5e3f387b0 100644 --- a/neutron/plugins/openvswitch/ovs_neutron_plugin.py +++ b/neutron/plugins/openvswitch/ovs_neutron_plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -80,8 +79,7 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/ryu/agent/ryu_neutron_agent.py b/neutron/plugins/ryu/agent/ryu_neutron_agent.py index 746a0c2f5..6086113c7 100755 --- a/neutron/plugins/ryu/agent/ryu_neutron_agent.py +++ b/neutron/plugins/ryu/agent/ryu_neutron_agent.py @@ -42,7 +42,6 @@ from neutron.common import topics from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.ryu.common import config # noqa @@ -209,7 +208,7 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback, consumers) def _create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def _setup_integration_br(self, root_helper, integ_br, tunnel_ip, ovsdb_port, ovsdb_ip): diff --git a/neutron/plugins/ryu/ryu_neutron_plugin.py b/neutron/plugins/ryu/ryu_neutron_plugin.py index 35065a41e..787ccb21c 100644 --- a/neutron/plugins/ryu/ryu_neutron_plugin.py +++ b/neutron/plugins/ryu/ryu_neutron_plugin.py @@ -23,7 +23,6 @@ from ryu.app import rest_nw_id from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import api as db @@ -59,7 +58,7 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback, self.ofp_rest_api_addr = ofp_rest_api_addr def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def get_ofp_rest_api(self, context, **kwargs): LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr) diff --git a/neutron/plugins/vmware/dhcp_meta/rpc.py b/neutron/plugins/vmware/dhcp_meta/rpc.py index 057e94d97..c32a39b37 100644 --- a/neutron/plugins/vmware/dhcp_meta/rpc.py +++ b/neutron/plugins/vmware/dhcp_meta/rpc.py @@ -24,7 +24,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as const from neutron.common import exceptions as ntn_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import db_base_plugin_v2 @@ -55,8 +54,7 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def handle_network_dhcp_access(plugin, context, network, action): diff --git a/neutron/policy.py b/neutron/policy.py index 4c64432b6..747638287 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -26,7 +26,6 @@ from oslo.config import cfg from neutron.api.v2 import attributes from neutron.common import exceptions import neutron.common.utils as utils -from neutron import manager from neutron.openstack.common import excutils from neutron.openstack.common import importutils from neutron.openstack.common import log as logging @@ -263,6 +262,9 @@ class OwnerCheck(policy.Check): # resource is handled by the core plugin. It might be worth # having a way to map resources to plugins so to make this # check more general + # FIXME(ihrachys): if import is put in global, circular + # import failure occurs + from neutron import manager f = getattr(manager.NeutronManager.get_instance().plugin, 'get_%s' % parent_res) # f *must* exist, if not found it is better to let neutron diff --git a/neutron/service.py b/neutron/service.py index 9b3073b5f..f14021769 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -13,13 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet import inspect import logging as std_logging import os import random from oslo.config import cfg +from oslo.messaging import server as rpc_server from neutron.common import config from neutron.common import rpc_compat @@ -112,23 +112,25 @@ class RpcWorker(object): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, plugin): self._plugin = plugin - self._server = None + self._servers = [] def start(self): # We may have just forked from parent process. A quick disposal of the # existing sql connections avoids producing errors later when they are # discovered to be broken. session.get_engine().pool.dispose() - self._server = self._plugin.start_rpc_listener() + self._servers = self._plugin.start_rpc_listener() def wait(self): - if isinstance(self._server, eventlet.greenthread.GreenThread): - self._server.wait() + for server in self._servers: + if isinstance(server, rpc_server.MessageHandlingServer): + server.wait() def stop(self): - if isinstance(self._server, eventlet.greenthread.GreenThread): - self._server.kill() - self._server = None + for server in self._servers: + if isinstance(server, rpc_server.MessageHandlingServer): + server.kill() + self._servers = [] def serve_rpc(): diff --git a/neutron/services/firewall/fwaas_plugin.py b/neutron/services/firewall/fwaas_plugin.py index fd2131e21..0238902f3 100644 --- a/neutron/services/firewall/fwaas_plugin.py +++ b/neutron/services/firewall/fwaas_plugin.py @@ -20,7 +20,6 @@ from oslo.config import cfg from neutron.common import exceptions as n_exception -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron import context as neutron_context @@ -42,7 +41,7 @@ class FirewallCallbacks(rpc_compat.RpcCallback): self.plugin = plugin def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def set_firewall_status(self, context, firewall_id, status, **kwargs): """Agent uses this to set a firewall's status.""" diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index c5505817d..29950c984 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -21,7 +21,6 @@ from oslo.config import cfg from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import api as qdbapi @@ -46,7 +45,7 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self]) + return [self] class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin, diff --git a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py index 8436cb835..85be0bacd 100644 --- a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py +++ b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py @@ -22,7 +22,6 @@ from oslo.config import cfg from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -66,8 +65,7 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback): self.plugin = plugin def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher( - [self, agents_db.AgentExtRpcCallback(self.plugin)]) + return [self, agents_db.AgentExtRpcCallback(self.plugin)] def get_ready_devices(self, context, host=None): with context.session.begin(subtransactions=True): diff --git a/neutron/services/metering/agents/metering_agent.py b/neutron/services/metering/agents/metering_agent.py index ba1fe6bac..80883f41b 100644 --- a/neutron/services/metering/agents/metering_agent.py +++ b/neutron/services/metering/agents/metering_agent.py @@ -26,6 +26,7 @@ from neutron.agent.common import config from neutron.agent import rpc as agent_rpc from neutron.common import config as common_config from neutron.common import constants as constants +from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -34,7 +35,6 @@ from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.notifier import api as notifier_api from neutron.openstack.common import periodic_task from neutron.openstack.common import service from neutron import service as neutron_service @@ -114,11 +114,8 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager): 'host': self.host} LOG.debug(_("Send metering report: %s"), data) - notifier_api.notify(self.context, - notifier_api.publisher_id('metering'), - 'l3.meter', - notifier_api.CONF.default_notification_level, - data) + notifier = n_rpc.get_notifier('metering') + notifier.info(self.context, 'l3.meter', data) info['pkts'] = 0 info['bytes'] = 0 info['time'] = 0 diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index ba19460d7..12904f23e 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -20,10 +20,10 @@ import requests import netaddr from oslo.config import cfg +from oslo import messaging import six from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron import context as ctx from neutron.openstack.common import lockutils @@ -184,12 +184,13 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): # history # 1.0 Initial version - RPC_API_VERSION = '1.0' + # TODO(ihrachys): we can't use RpcCallback here due to inheritance + # issues + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, agent, host): - # TODO(ihrachys): we can't use RpcCallback here due to - # inheritance issues self.host = host self.conn = rpc_compat.create_connection(new=True) context = ctx.get_admin_context_without_session() @@ -225,7 +226,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): for k, v in csrs_found.items()]) def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def vpnservice_updated(self, context, **kwargs): """Handle VPNaaS service driver change notifications.""" diff --git a/neutron/services/vpn/device_drivers/ipsec.py b/neutron/services/vpn/device_drivers/ipsec.py index 0d9ded960..2480eb272 100644 --- a/neutron/services/vpn/device_drivers/ipsec.py +++ b/neutron/services/vpn/device_drivers/ipsec.py @@ -23,11 +23,11 @@ import shutil import jinja2 import netaddr from oslo.config import cfg +from oslo import messaging import six from neutron.agent.linux import ip_lib from neutron.agent.linux import utils -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron import context from neutron.openstack.common import lockutils @@ -487,9 +487,11 @@ class IPsecDriver(device_drivers.DeviceDriver): RPC_API_VERSION = '1.0' + # TODO(ihrachys): we can't use RpcCallback here due to inheritance + # issues + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, agent, host): - # TODO(ihrachys): we can't use RpcCallback here due to - # inheritance issues self.agent = agent self.conf = self.agent.conf self.root_helper = self.agent.root_helper @@ -514,7 +516,7 @@ class IPsecDriver(device_drivers.DeviceDriver): interval=self.conf.ipsec.ipsec_status_check_interval) def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def _update_nat(self, vpnservice, func): """Setting up nat rule in iptables. diff --git a/neutron/services/vpn/service_drivers/cisco_ipsec.py b/neutron/services/vpn/service_drivers/cisco_ipsec.py index 856572319..c2b39da9e 100644 --- a/neutron/services/vpn/service_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/service_drivers/cisco_ipsec.py @@ -16,7 +16,6 @@ import netaddr from netaddr import core as net_exc from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.openstack.common import excutils from neutron.openstack.common import log as logging @@ -55,7 +54,7 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback): self.driver = driver def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def get_vpn_services_on_host(self, context, host=None): """Retuns info on the vpnservices on the host.""" diff --git a/neutron/services/vpn/service_drivers/ipsec.py b/neutron/services/vpn/service_drivers/ipsec.py index cf4b055d8..13b7c171b 100644 --- a/neutron/services/vpn/service_drivers/ipsec.py +++ b/neutron/services/vpn/service_drivers/ipsec.py @@ -16,7 +16,6 @@ # under the License. import netaddr -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.openstack.common import log as logging from neutron.services.vpn.common import topics @@ -42,7 +41,7 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback): self.driver = driver def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def get_vpn_services_on_host(self, context, host=None): """Returns the vpnservices on the host.""" diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 87412f924..95034f653 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -29,15 +29,14 @@ import eventlet.timeout import fixtures import mock from oslo.config import cfg +from oslo.messaging import conffixture as messaging_conffixture import testtools from neutron.common import config +from neutron.common import rpc as n_rpc from neutron.db import agentschedulers_db from neutron import manager -from neutron.openstack.common.notifier import api as notifier_api -from neutron.openstack.common.notifier import test_notifier -from neutron.openstack.common import rpc -from neutron.openstack.common.rpc import impl_fake +from neutron.tests import fake_notifier from neutron.tests import post_mortem_debug @@ -58,6 +57,10 @@ def fake_use_fatal_exceptions(*args): return True +def fake_consume_in_threads(self): + return [] + + class BaseTestCase(testtools.TestCase): def cleanup_core_plugin(self): @@ -90,16 +93,10 @@ class BaseTestCase(testtools.TestCase): if core_plugin is not None: cfg.CONF.set_override('core_plugin', core_plugin) - def _cleanup_test_notifier(self): - test_notifier.NOTIFICATIONS = [] - def setup_notification_driver(self, notification_driver=None): - # to reload the drivers - self.addCleanup(notifier_api._reset_drivers) - self.addCleanup(self._cleanup_test_notifier) - notifier_api._reset_drivers() + self.addCleanup(fake_notifier.reset) if notification_driver is None: - notification_driver = [test_notifier.__name__] + notification_driver = [fake_notifier.__name__] cfg.CONF.set_override("notification_driver", notification_driver) @staticmethod @@ -113,10 +110,6 @@ class BaseTestCase(testtools.TestCase): else: conf(args) - def _cleanup_rpc_backend(self): - rpc._RPCIMPL = None - impl_fake.CONSUMERS.clear() - def setUp(self): super(BaseTestCase, self).setUp() @@ -124,8 +117,6 @@ class BaseTestCase(testtools.TestCase): # test-specific cleanup has a chance to release references. self.addCleanup(self.cleanup_core_plugin) - self.addCleanup(self._cleanup_rpc_backend) - # Configure this first to ensure pm debugging support for setUp() if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING: self.addOnException(post_mortem_debug.exception_handler) @@ -179,6 +170,25 @@ class BaseTestCase(testtools.TestCase): 'neutron.common.exceptions.NeutronException.use_fatal_exceptions', fake_use_fatal_exceptions)) + # don't actually start RPC listeners when testing + self.useFixture(fixtures.MonkeyPatch( + 'neutron.common.rpc_compat.Connection.consume_in_thread', + fake_consume_in_threads)) + + self.useFixture(fixtures.MonkeyPatch( + 'oslo.messaging.Notifier', fake_notifier.FakeNotifier)) + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_driver = 'fake' + self.messaging_conf.response_timeout = 15 + self.useFixture(self.messaging_conf) + + self.addCleanup(n_rpc.clear_extra_exmods) + n_rpc.add_extra_exmods('neutron.test') + + self.addCleanup(n_rpc.cleanup) + n_rpc.init(CONF) + if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml': raise self.skipException('XML Testing Skipped in Py26') diff --git a/neutron/tests/fake_notifier.py b/neutron/tests/fake_notifier.py new file mode 100644 index 000000000..012f3351e --- /dev/null +++ b/neutron/tests/fake_notifier.py @@ -0,0 +1,50 @@ +# Copyright 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools + + +NOTIFICATIONS = [] + + +def reset(): + del NOTIFICATIONS[:] + + +FakeMessage = collections.namedtuple('Message', + ['publisher_id', 'priority', + 'event_type', 'payload']) + + +class FakeNotifier(object): + + def __init__(self, transport, publisher_id=None): + self.transport = transport + self.publisher_id = publisher_id + for priority in ('debug', 'info', 'warn', 'error', 'critical'): + setattr(self, priority, + functools.partial(self._notify, priority=priority.upper())) + + def prepare(self, publisher_id=None): + if publisher_id is None: + publisher_id = self.publisher_id + return self.__class__(self.transport, publisher_id) + + def _notify(self, ctxt, event_type, payload, priority): + msg = dict(publisher_id=self.publisher_id, + priority=priority, + event_type=event_type, + payload=payload) + NOTIFICATIONS.append(msg) diff --git a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py index 4af19fc54..965842738 100644 --- a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py +++ b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py @@ -23,9 +23,9 @@ Unit Tests for hyperv neutron rpc import mock from neutron.agent import rpc as agent_rpc +from neutron.common import rpc_compat from neutron.common import topics from neutron.openstack.common import context -from neutron.openstack.common import rpc from neutron.plugins.hyperv import agent_notifier_api as ana from neutron.plugins.hyperv.common import constants from neutron.tests import base @@ -38,19 +38,19 @@ class rpcHyperVApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False - with mock.patch.object(rpc, rpc_method) as rpc_method_mock: + proxy = rpc_compat.RpcProxy + with mock.patch.object(proxy, rpc_method) as rpc_method_mock: rpc_method_mock.return_value = expected_retval retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] - for arg, expected_arg in zip(rpc_method_mock.call_args[0], - expected_args): - self.assertEqual(arg, expected_arg) + expected = [ + mock.call(ctxt, expected_msg, topic=topic) + ] + rpc_method_mock.assert_has_calls(expected) def test_delete_network(self): rpcapi = ana.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/linuxbridge/test_rpcapi.py b/neutron/tests/unit/linuxbridge/test_rpcapi.py index 762a65be1..616a06acd 100644 --- a/neutron/tests/unit/linuxbridge/test_rpcapi.py +++ b/neutron/tests/unit/linuxbridge/test_rpcapi.py @@ -35,7 +35,6 @@ class rpcApiTestCase(base.BaseTestCase): expected_retval = 'foo' if method == 'call' else None if not expected_msg: expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -49,15 +48,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(expected_retval, retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(expected_arg, arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = plb.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a2d3bf0eb..af48a74f1 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -20,9 +20,9 @@ Unit Tests for ml2 rpc import mock from neutron.agent import rpc as agent_rpc +from neutron.common import rpc_compat from neutron.common import topics from neutron.openstack.common import context -from neutron.openstack.common import rpc from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2 import rpc as plugin_rpc from neutron.tests import base @@ -34,20 +34,19 @@ class RpcApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False + rpc = rpc_compat.RpcProxy with mock.patch.object(rpc, rpc_method) as rpc_method_mock: rpc_method_mock.return_value = expected_retval retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - - expected_args = [ctxt, topic, expected_msg] - for arg, expected_arg in zip(rpc_method_mock.call_args[0], - expected_args): - self.assertEqual(arg, expected_arg) + expected = [ + mock.call(ctxt, expected_msg, topic=topic) + ] + rpc_method_mock.assert_has_calls(expected) def test_delete_network(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/mlnx/test_rpcapi.py b/neutron/tests/unit/mlnx/test_rpcapi.py index 80dcf7827..ea34a840b 100644 --- a/neutron/tests/unit/mlnx/test_rpcapi.py +++ b/neutron/tests/unit/mlnx/test_rpcapi.py @@ -37,7 +37,6 @@ class rpcApiTestCase(base.BaseTestCase): expected_retval = 'foo' if method == 'call' else None if not expected_msg: expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -51,15 +50,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(expected_retval, retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(expected_arg, arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py index 1b6a7370a..e8f75b9f4 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py +++ b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py @@ -34,7 +34,6 @@ class rpcApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -48,15 +47,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(arg, expected_arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = povs.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/services/metering/test_metering_agent.py b/neutron/tests/unit/services/metering/test_metering_agent.py index 3e1d0db29..b3e3511fe 100644 --- a/neutron/tests/unit/services/metering/test_metering_agent.py +++ b/neutron/tests/unit/services/metering/test_metering_agent.py @@ -18,10 +18,10 @@ import mock from oslo.config import cfg from neutron.agent.common import config -from neutron.openstack.common.notifier import test_notifier from neutron.openstack.common import uuidutils from neutron.services.metering.agents import metering_agent from neutron.tests import base +from neutron.tests import fake_notifier _uuid = uuidutils.generate_uuid @@ -96,8 +96,8 @@ class TestMeteringOperations(base.BaseTestCase): 'bytes': 444}} self.agent._metering_loop() - self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0) - for n in test_notifier.NOTIFICATIONS: + self.assertNotEqual(len(fake_notifier.NOTIFICATIONS), 0) + for n in fake_notifier.NOTIFICATIONS: if n['event_type'] == 'l3.meter': break diff --git a/neutron/tests/unit/test_agent_rpc.py b/neutron/tests/unit/test_agent_rpc.py index bc4ae4a17..569a73956 100644 --- a/neutron/tests/unit/test_agent_rpc.py +++ b/neutron/tests/unit/test_agent_rpc.py @@ -27,7 +27,7 @@ class AgentRPCPluginApi(base.BaseTestCase): agent = rpc.PluginApi('fake_topic') ctxt = context.RequestContext('fake_user', 'fake_project') expect_val = 'foo' - with mock.patch('neutron.openstack.common.rpc.call') as rpc_call: + with mock.patch('neutron.common.rpc_compat.RpcProxy.call') as rpc_call: rpc_call.return_value = expect_val func_obj = getattr(agent, method) if method == 'tunnel_sync': diff --git a/neutron/tests/unit/test_api_v2.py b/neutron/tests/unit/test_api_v2.py index c09dd21ee..38d54f7ca 100644 --- a/neutron/tests/unit/test_api_v2.py +++ b/neutron/tests/unit/test_api_v2.py @@ -33,12 +33,12 @@ from neutron.api.v2 import router from neutron.common import exceptions as n_exc from neutron import context from neutron import manager -from neutron.openstack.common.notifier import api as notifer_api from neutron.openstack.common import policy as common_policy from neutron.openstack.common import uuidutils from neutron import policy from neutron import quota from neutron.tests import base +from neutron.tests import fake_notifier from neutron.tests.unit import testlib_api @@ -1242,41 +1242,42 @@ class V2Views(base.BaseTestCase): class NotificationTest(APIv2TestBase): - def _resource_op_notifier(self, opname, resource, expected_errors=False, - notification_level='INFO'): + + def setUp(self): + super(NotificationTest, self).setUp() + fake_notifier.reset() + + def _resource_op_notifier(self, opname, resource, expected_errors=False): initial_input = {resource: {'name': 'myname'}} instance = self.plugin.return_value instance.get_networks.return_value = initial_input instance.get_networks_count.return_value = 0 expected_code = exc.HTTPCreated.code - with mock.patch.object(notifer_api, 'notify') as mynotifier: - if opname == 'create': - initial_input[resource]['tenant_id'] = _uuid() - res = self.api.post_json( - _get_path('networks'), - initial_input, expect_errors=expected_errors) - if opname == 'update': - res = self.api.put_json( - _get_path('networks', id=_uuid()), - initial_input, expect_errors=expected_errors) - expected_code = exc.HTTPOk.code - if opname == 'delete': - initial_input[resource]['tenant_id'] = _uuid() - res = self.api.delete( - _get_path('networks', id=_uuid()), - expect_errors=expected_errors) - expected_code = exc.HTTPNoContent.code - expected = [mock.call(mock.ANY, - 'network.' + cfg.CONF.host, - resource + "." + opname + ".start", - notification_level, - mock.ANY), - mock.call(mock.ANY, - 'network.' + cfg.CONF.host, - resource + "." + opname + ".end", - notification_level, - mock.ANY)] - self.assertEqual(expected, mynotifier.call_args_list) + if opname == 'create': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.post_json( + _get_path('networks'), + initial_input, expect_errors=expected_errors) + if opname == 'update': + res = self.api.put_json( + _get_path('networks', id=_uuid()), + initial_input, expect_errors=expected_errors) + expected_code = exc.HTTPOk.code + if opname == 'delete': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.delete( + _get_path('networks', id=_uuid()), + expect_errors=expected_errors) + expected_code = exc.HTTPNoContent.code + + expected_events = ('.'.join([resource, opname, "start"]), + '.'.join([resource, opname, "end"])) + self.assertEqual(len(fake_notifier.NOTIFICATIONS), + len(expected_events)) + for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events): + self.assertEqual('INFO', msg['priority']) + self.assertEqual(event, msg['event_type']) + self.assertEqual(res.status_int, expected_code) def test_network_create_notifer(self): @@ -1288,11 +1289,6 @@ class NotificationTest(APIv2TestBase): def test_network_update_notifer(self): self._resource_op_notifier('update', 'network') - def test_network_create_notifer_with_log_level(self): - cfg.CONF.set_override('default_notification_level', 'DEBUG') - self._resource_op_notifier('create', 'network', - notification_level='DEBUG') - class DHCPNotificationTest(APIv2TestBase): def _test_dhcp_notifier(self, opname, resource, initial_input=None): diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py index b02ba15f8..4eb80d0d3 100644 --- a/neutron/tests/unit/test_l3_plugin.py +++ b/neutron/tests/unit/test_l3_plugin.py @@ -38,9 +38,9 @@ from neutron.extensions import l3 from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import test_notifier from neutron.openstack.common import uuidutils from neutron.plugins.common import constants as service_constants +from neutron.tests import fake_notifier from neutron.tests.unit import test_agent_ext_plugin from neutron.tests.unit import test_api_v2 from neutron.tests.unit import test_api_v2_extension @@ -660,7 +660,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): 'subnet.create.end', 'router.interface.create', 'router.interface.delete'] - test_notifier.NOTIFICATIONS = [] + fake_notifier.reset() with self.router() as r: with self.subnet() as s: body = self._router_interface_action('add', @@ -683,9 +683,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): self.assertEqual( set(exp_notifications), - set(n['event_type'] for n in test_notifier.NOTIFICATIONS)) + set(n['event_type'] for n in fake_notifier.NOTIFICATIONS)) - for n in test_notifier.NOTIFICATIONS: + for n in fake_notifier.NOTIFICATIONS: if n['event_type'].startswith('router.interface.'): payload = n['payload']['router_interface'] self.assertIn('id', payload) diff --git a/requirements.txt b/requirements.txt index 5ba04f255..f34177ab2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,6 +23,7 @@ alembic>=0.4.1 six>=1.7.0 stevedore>=0.14 oslo.config>=1.2.1 +oslo.messaging>=1.3.0 oslo.rootwrap python-novaclient>=2.17.0 diff --git a/setup.cfg b/setup.cfg index cc4db51b9..0eaaaed0f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -169,6 +169,14 @@ neutron.ml2.mechanism_drivers = fslsdn = neutron.plugins.ml2.drivers.mechanism_fslsdn:FslsdnMechanismDriver neutron.openstack.common.cache.backends = memory = neutron.openstack.common.cache._backends.memory:MemoryBackend +# These are for backwards compat with Icehouse notification_driver configuration values +oslo.messaging.notify.drivers = + neutron.openstack.common.notifier.log_notifier = oslo.messaging.notify._impl_log:LogDriver + neutron.openstack.common.notifier.no_op_notifier = oslo.messaging.notify._impl_noop:NoOpDriver + neutron.openstack.common.notifier.rpc_notifier2 = oslo.messaging.notify._impl_messaging:MessagingV2Driver + neutron.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver + neutron.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver + [build_sphinx] all_files = 1 -- 2.45.2