]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Port to oslo.messaging
authorIhar Hrachyshka <ihrachys@redhat.com>
Mon, 2 Jun 2014 15:40:38 +0000 (17:40 +0200)
committerIhar Hrachyshka <ihrachys@redhat.com>
Thu, 19 Jun 2014 10:58:01 +0000 (12:58 +0200)
Now that all preparations are done, actually port the code to use
oslo.messaging. This patch does as little as possible. Follow up patches
that refactor and cleanup the code and configuration files, will be
merged later. The reason for this is to make the patch as slim as
possible, to make review process more smooth and concentrated.

Details:
* neutron/common/rpc.py:
  - added init() and cleanup() to set global RPC layer state.
  - added utility functions: get_server(), get_client(), get_notifier()
    that wrap up oslo.messaging API a bit, enforcing eventlet executor
    and setting serializer, among other things.
  - removed PluginRpcDispatcher, instead introduced PluginRpcSerializer
    to use as a default serializer for API callbacks.

* neutron/common/rpc_compat.py:
  - emulated incubator RPC layer behaviour thru previously introduced
    stub classes (RpcCallback, RpcProxy, ...) using new oslo.messaging
    API.
  - switched to using new oslo.messaging exception types.

* neutron/service.py:
  - expect multiple RPC listeners that are of MessageHandlingServer
    type, not GreenThread.

* neutron/common/config.py:
  - initialize RPC layer in init()

* setup.cfg:
  - added entry points for old notifier drivers to retain backward
    compatibility.

* neutron/tests/...:
  - introduced fake_notifier to replace impl_fake.
  - faked out consume_in_thread() to avoid starting RPC listeners when
    running unit tests.
  - used 'fake' transport driver.
  - made sure neutron.test.* exceptions are caught.
  - initialize and clean up RPC layer for each test case.

* Ported all affected code from using neutron.openstack.common.notifier
  API to oslo.messaging.Notifier.

* rpc.set_defaults() was renamed to rpc.set_transport_defaults()

* other changes not worth mentioning here.

blueprint oslo-messaging

DocImpact

Change-Id: I5a91c34df6e300f2dc46217b1b16352fcc3039fc

56 files changed:
neutron/api/v2/base.py
neutron/cmd/usage_audit.py
neutron/common/config.py
neutron/common/rpc.py
neutron/common/rpc_compat.py
neutron/db/l3_db.py
neutron/db/metering/metering_rpc.py
neutron/openstack/common/service.py
neutron/plugins/bigswitch/agent/restproxy_agent.py
neutron/plugins/bigswitch/plugin.py
neutron/plugins/brocade/NeutronPlugin.py
neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py
neutron/plugins/hyperv/agent/hyperv_neutron_agent.py
neutron/plugins/hyperv/rpc_callbacks.py
neutron/plugins/ibm/agent/sdnve_neutron_agent.py
neutron/plugins/ibm/sdnve_neutron_plugin.py
neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/linuxbridge/lb_neutron_plugin.py
neutron/plugins/midonet/plugin.py
neutron/plugins/ml2/rpc.py
neutron/plugins/mlnx/agent/eswitch_neutron_agent.py
neutron/plugins/mlnx/rpc_callbacks.py
neutron/plugins/nec/agent/nec_neutron_agent.py
neutron/plugins/nec/common/config.py
neutron/plugins/nec/nec_plugin.py
neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py
neutron/plugins/oneconvergence/plugin.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/plugins/openvswitch/ovs_neutron_plugin.py
neutron/plugins/ryu/agent/ryu_neutron_agent.py
neutron/plugins/ryu/ryu_neutron_plugin.py
neutron/plugins/vmware/dhcp_meta/rpc.py
neutron/policy.py
neutron/service.py
neutron/services/firewall/fwaas_plugin.py
neutron/services/l3_router/l3_router_plugin.py
neutron/services/loadbalancer/drivers/common/agent_driver_base.py
neutron/services/metering/agents/metering_agent.py
neutron/services/vpn/device_drivers/cisco_ipsec.py
neutron/services/vpn/device_drivers/ipsec.py
neutron/services/vpn/service_drivers/cisco_ipsec.py
neutron/services/vpn/service_drivers/ipsec.py
neutron/tests/base.py
neutron/tests/fake_notifier.py [new file with mode: 0644]
neutron/tests/unit/hyperv/test_hyperv_rpcapi.py
neutron/tests/unit/linuxbridge/test_rpcapi.py
neutron/tests/unit/ml2/test_rpcapi.py
neutron/tests/unit/mlnx/test_rpcapi.py
neutron/tests/unit/openvswitch/test_ovs_rpcapi.py
neutron/tests/unit/services/metering/test_metering_agent.py
neutron/tests/unit/test_agent_rpc.py
neutron/tests/unit/test_api_v2.py
neutron/tests/unit/test_l3_plugin.py
requirements.txt
setup.cfg

index 2ed735e277c4fe744857609545d126bc17c947d7..89ef47ed6de58e3cab92239ffbc900446cf67b99 100644 (file)
@@ -27,8 +27,8 @@ from neutron.api.v2 import attributes
 from neutron.api.v2 import resource as wsgi_resource
 from neutron.common import constants as const
 from neutron.common import exceptions
+from neutron.common import rpc as n_rpc
 from neutron.openstack.common import log as logging
-from neutron.openstack.common.notifier import api as notifier_api
 from neutron import policy
 from neutron import quota
 
@@ -69,7 +69,7 @@ class Controller(object):
         self._native_sorting = self._is_native_sorting_supported()
         self._policy_attrs = [name for (name, info) in self._attr_info.items()
                               if info.get('required_by_policy')]
-        self._publisher_id = notifier_api.publisher_id('network')
+        self._notifier = n_rpc.get_notifier('network')
         # use plugin's dhcp notifier, if this is already instantiated
         agent_notifiers = getattr(plugin, 'agent_notifiers', {})
         self._dhcp_agent_notifier = (
@@ -372,10 +372,8 @@ class Controller(object):
     def create(self, request, body=None, **kwargs):
         """Creates a new instance of the requested entity."""
         parent_id = kwargs.get(self._parent_id_name)
-        notifier_api.notify(request.context,
-                            self._publisher_id,
+        self._notifier.info(request.context,
                             self._resource + '.create.start',
-                            notifier_api.CONF.default_notification_level,
                             body)
         body = Controller.prepare_request_body(request.context, body, True,
                                                self._resource, self._attr_info,
@@ -419,10 +417,8 @@ class Controller(object):
 
         def notify(create_result):
             notifier_method = self._resource + '.create.end'
-            notifier_api.notify(request.context,
-                                self._publisher_id,
+            self._notifier.info(request.context,
                                 notifier_method,
-                                notifier_api.CONF.default_notification_level,
                                 create_result)
             self._send_dhcp_notification(request.context,
                                          create_result,
@@ -458,10 +454,8 @@ class Controller(object):
 
     def delete(self, request, id, **kwargs):
         """Deletes the specified entity."""
-        notifier_api.notify(request.context,
-                            self._publisher_id,
+        self._notifier.info(request.context,
                             self._resource + '.delete.start',
-                            notifier_api.CONF.default_notification_level,
                             {self._resource + '_id': id})
         action = self._plugin_handlers[self.DELETE]
 
@@ -482,10 +476,8 @@ class Controller(object):
         obj_deleter = getattr(self._plugin, action)
         obj_deleter(request.context, id, **kwargs)
         notifier_method = self._resource + '.delete.end'
-        notifier_api.notify(request.context,
-                            self._publisher_id,
+        self._notifier.info(request.context,
                             notifier_method,
-                            notifier_api.CONF.default_notification_level,
                             {self._resource + '_id': id})
         result = {self._resource: self._view(request.context, obj)}
         self._send_nova_notification(action, {}, result)
@@ -502,10 +494,8 @@ class Controller(object):
             msg = _("Invalid format: %s") % request.body
             raise exceptions.BadRequest(resource='body', msg=msg)
         payload['id'] = id
-        notifier_api.notify(request.context,
-                            self._publisher_id,
+        self._notifier.info(request.context,
                             self._resource + '.update.start',
-                            notifier_api.CONF.default_notification_level,
                             payload)
         body = Controller.prepare_request_body(request.context, body, False,
                                                self._resource, self._attr_info,
@@ -541,11 +531,7 @@ class Controller(object):
         obj = obj_updater(request.context, id, **kwargs)
         result = {self._resource: self._view(request.context, obj)}
         notifier_method = self._resource + '.update.end'
-        notifier_api.notify(request.context,
-                            self._publisher_id,
-                            notifier_method,
-                            notifier_api.CONF.default_notification_level,
-                            result)
+        self._notifier.info(request.context, notifier_method, result)
         self._send_dhcp_notification(request.context,
                                      result,
                                      notifier_method)
index f48e0c691c3864d6e83c6613cc386a1b2669e5cc..6294d710d9b27fbcb6aa1c2bfd08d5650204e529 100644 (file)
@@ -26,9 +26,9 @@ import sys
 from oslo.config import cfg
 
 from neutron.common import config
+from neutron.common import rpc as n_rpc
 from neutron import context
 from neutron import manager
-from neutron.openstack.common.notifier import api as notifier_api
 
 
 def main():
@@ -37,33 +37,14 @@ def main():
 
     cxt = context.get_admin_context()
     plugin = manager.NeutronManager.get_plugin()
+    notifier = n_rpc.get_notifier('network')
     for network in plugin.get_networks(cxt):
-        notifier_api.notify(cxt,
-                            notifier_api.publisher_id('network'),
-                            'network.exists',
-                            notifier_api.INFO,
-                            {'network': network})
+        notifier.info(cxt, 'network.exists', {'network': network})
     for subnet in plugin.get_subnets(cxt):
-        notifier_api.notify(cxt,
-                            notifier_api.publisher_id('network'),
-                            'subnet.exists',
-                            notifier_api.INFO,
-                            {'subnet': subnet})
+        notifier.info(cxt, 'subnet.exists', {'subnet': subnet})
     for port in plugin.get_ports(cxt):
-        notifier_api.notify(cxt,
-                            notifier_api.publisher_id('network'),
-                            'port.exists',
-                            notifier_api.INFO,
-                            {'port': port})
+        notifier.info(cxt, 'port.exists', {'port': port})
     for router in plugin.get_routers(cxt):
-        notifier_api.notify(cxt,
-                            notifier_api.publisher_id('network'),
-                            'router.exists',
-                            notifier_api.INFO,
-                            {'router': router})
+        notifier.info(cxt, 'router.exists', {'router': router})
     for floatingip in plugin.get_floatingips(cxt):
-        notifier_api.notify(cxt,
-                            notifier_api.publisher_id('network'),
-                            'floatingip.exists',
-                            notifier_api.INFO,
-                            {'floatingip': floatingip})
+        notifier.info(cxt, 'floatingip.exists', {'floatingip': floatingip})
index a7b7a9559474724fda84d5f3aab1a43e908340fc..0a8232fa02befab12e0344c6a3696076e3673cef 100644 (file)
@@ -20,13 +20,13 @@ Routines for configuring Neutron
 import os
 
 from oslo.config import cfg
+from oslo import messaging
 from paste import deploy
 
 from neutron.api.v2 import attributes
 from neutron.common import utils
 from neutron.openstack.common.db import options as db_options
 from neutron.openstack.common import log as logging
-from neutron.openstack.common import rpc
 from neutron import version
 
 
@@ -125,7 +125,7 @@ cfg.CONF.register_opts(core_opts)
 cfg.CONF.register_cli_opts(core_cli_opts)
 
 # Ensure that the control exchange is set correctly
-rpc.set_defaults(control_exchange='neutron')
+messaging.set_transport_defaults(control_exchange='neutron')
 _SQL_CONNECTION_DEFAULT = 'sqlite://'
 # Update the default QueuePool parameters. These can be tweaked by the
 # configuration variables - max_pool_size, max_overflow and pool_timeout
@@ -139,6 +139,11 @@ def init(args, **kwargs):
              version='%%prog %s' % version.version_info.release_string(),
              **kwargs)
 
+    # FIXME(ihrachys): if import is put in global, circular import
+    # failure occurs
+    from neutron.common import rpc as n_rpc
+    n_rpc.init(cfg.CONF)
+
     # Validate that the base_mac is of the correct format
     msg = attributes._validate_regex(cfg.CONF.base_mac,
                                      attributes.MAC_PATTERN)
index 643cf5934401366c2decedcb8d3b2130d3bee181..98d46814044542504d6b3985dccd4bdcc3450693 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from oslo.config import cfg
+from oslo import messaging
+from oslo.messaging import serializer as om_serializer
+
+from neutron.common import exceptions
 from neutron import context
 from neutron.openstack.common import log as logging
-from neutron.openstack.common.rpc import dispatcher
 
 
 LOG = logging.getLogger(__name__)
 
 
-class PluginRpcDispatcher(dispatcher.RpcDispatcher):
-    """This class is used to convert RPC common context into
+TRANSPORT = None
+NOTIFIER = None
+
+ALLOWED_EXMODS = [
+    exceptions.__name__,
+]
+EXTRA_EXMODS = []
+
+
+TRANSPORT_ALIASES = {
+    'neutron.openstack.common.rpc.impl_fake': 'fake',
+    'neutron.openstack.common.rpc.impl_qpid': 'qpid',
+    'neutron.openstack.common.rpc.impl_kombu': 'rabbit',
+    'neutron.openstack.common.rpc.impl_zmq': 'zmq',
+    'neutron.rpc.impl_fake': 'fake',
+    'neutron.rpc.impl_qpid': 'qpid',
+    'neutron.rpc.impl_kombu': 'rabbit',
+    'neutron.rpc.impl_zmq': 'zmq',
+}
+
+
+def init(conf):
+    global TRANSPORT, NOTIFIER
+    exmods = get_allowed_exmods()
+    TRANSPORT = messaging.get_transport(conf,
+                                        allowed_remote_exmods=exmods,
+                                        aliases=TRANSPORT_ALIASES)
+    NOTIFIER = messaging.Notifier(TRANSPORT)
+
+
+def cleanup():
+    global TRANSPORT, NOTIFIER
+    assert TRANSPORT is not None
+    assert NOTIFIER is not None
+    TRANSPORT.cleanup()
+    TRANSPORT = NOTIFIER = None
+
+
+def add_extra_exmods(*args):
+    EXTRA_EXMODS.extend(args)
+
+
+def clear_extra_exmods():
+    del EXTRA_EXMODS[:]
+
+
+def get_allowed_exmods():
+    return ALLOWED_EXMODS + EXTRA_EXMODS
+
+
+def get_client(target, version_cap=None, serializer=None):
+    assert TRANSPORT is not None
+    serializer = PluginRpcSerializer(serializer)
+    return messaging.RPCClient(TRANSPORT,
+                               target,
+                               version_cap=version_cap,
+                               serializer=serializer)
+
+
+def get_server(target, endpoints, serializer=None):
+    assert TRANSPORT is not None
+    serializer = PluginRpcSerializer(serializer)
+    return messaging.get_rpc_server(TRANSPORT,
+                                    target,
+                                    endpoints,
+                                    executor='eventlet',
+                                    serializer=serializer)
+
+
+def get_notifier(service=None, host=None, publisher_id=None):
+    assert NOTIFIER is not None
+    if not publisher_id:
+        publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
+    return NOTIFIER.prepare(publisher_id=publisher_id)
+
+
+class PluginRpcSerializer(om_serializer.Serializer):
+    """This serializer is used to convert RPC common context into
     Neutron Context.
     """
+    def __init__(self, base):
+        super(PluginRpcSerializer, self).__init__()
+        self._base = base
+
+    def serialize_entity(self, ctxt, entity):
+        if not self._base:
+            return entity
+        return self._base.serialize_entity(ctxt, entity)
+
+    def deserialize_entity(self, ctxt, entity):
+        if not self._base:
+            return entity
+        return self._base.deserialize_entity(ctxt, entity)
 
-    def __init__(self, callbacks):
-        super(PluginRpcDispatcher, self).__init__(callbacks)
+    def serialize_context(self, ctxt):
+        return ctxt.to_dict()
 
-    def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs):
-        rpc_ctxt_dict = rpc_ctxt.to_dict()
+    def deserialize_context(self, ctxt):
+        rpc_ctxt_dict = ctxt.copy()
         user_id = rpc_ctxt_dict.pop('user_id', None)
         if not user_id:
             user_id = rpc_ctxt_dict.pop('user', None)
         tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
         if not tenant_id:
             tenant_id = rpc_ctxt_dict.pop('project_id', None)
-        neutron_ctxt = context.Context(user_id, tenant_id,
-                                       load_admin_roles=False, **rpc_ctxt_dict)
-        return super(PluginRpcDispatcher, self).dispatch(
-            neutron_ctxt, version, method, namespace, **kwargs)
+        return context.Context(user_id, tenant_id,
+                               load_admin_roles=False, **rpc_ctxt_dict)
index f494d533804f6e5a373e8a23538ed7eea8976eb6..939551d49300707d1895908b84eaf69c78f4e8b4 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from oslo.config import cfg
+from oslo import messaging
+
+from neutron.common import rpc as n_rpc
 from neutron.openstack.common import log as logging
-from neutron.openstack.common import rpc
-from neutron.openstack.common.rpc import common as rpc_common
-from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
-from neutron.openstack.common.rpc import proxy
 from neutron.openstack.common import service
 
 
 LOG = logging.getLogger(__name__)
 
 
-class RpcProxy(proxy.RpcProxy):
+class RpcProxy(object):
     '''
     This class is created to facilitate migration from oslo-incubator
     RPC layer implementation to oslo.messaging and is intended to
     emulate RpcProxy class behaviour using oslo.messaging API once the
     migration is applied.
     '''
+    RPC_API_NAMESPACE = None
+
+    def __init__(self, topic, default_version, version_cap=None):
+        self.topic = topic
+        target = messaging.Target(topic=topic, version=default_version)
+        self._client = n_rpc.get_client(target, version_cap=version_cap)
+
+    def make_msg(self, method, **kwargs):
+        return {'method': method,
+                'namespace': self.RPC_API_NAMESPACE,
+                'args': kwargs}
+
+    def call(self, context, msg, **kwargs):
+        return self.__call_rpc_method(
+            context, msg, rpc_method='call', **kwargs)
+
+    def cast(self, context, msg, **kwargs):
+        self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
+
+    def fanout_cast(self, context, msg, **kwargs):
+        kwargs['fanout'] = True
+        self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
+
+    def __call_rpc_method(self, context, msg, **kwargs):
+        options = dict(
+            ((opt, kwargs[opt])
+             for opt in ('fanout', 'timeout', 'topic', 'version')
+             if kwargs.get(opt))
+        )
+        if msg['namespace']:
+            options['namespace'] = msg['namespace']
+
+        if options:
+            callee = self._client.prepare(**options)
+        else:
+            callee = self._client
+
+        func = getattr(callee, kwargs['rpc_method'])
+        return func(context, msg['method'], **msg['args'])
 
 
 class RpcCallback(object):
@@ -40,6 +79,11 @@ class RpcCallback(object):
     callback version using oslo.messaging API once the migration is
     applied.
     '''
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self):
+        super(RpcCallback, self).__init__()
+        self.target = messaging.Target(version=self.RPC_API_VERSION)
 
 
 class Service(service.Service):
@@ -64,8 +108,7 @@ class Service(service.Service):
         LOG.debug("Creating Consumer connection for Service %s" %
                   self.topic)
 
-        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
-                                                  self.serializer)
+        dispatcher = [self.manager]
 
         # Share this same connection for these Consumers
         self.conn.create_consumer(self.topic, dispatcher, fanout=False)
@@ -93,11 +136,30 @@ class Service(service.Service):
         super(Service, self).stop()
 
 
+class Connection(object):
+
+    def __init__(self):
+        super(Connection, self).__init__()
+        self.servers = []
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        target = messaging.Target(
+            topic=topic, server=cfg.CONF.host, fanout=fanout)
+        server = n_rpc.get_server(target, proxy)
+        self.servers.append(server)
+
+    def consume_in_thread(self):
+        for server in self.servers:
+            server.start()
+        return self.servers
+
+
 # functions
-create_connection = rpc.create_connection
+def create_connection(new=True):
+    return Connection()
 
 
 # exceptions
-RPCException = rpc_common.RPCException
-RemoteError = rpc_common.RemoteError
-MessagingTimeout = rpc_common.Timeout
+RPCException = messaging.MessagingException
+RemoteError = messaging.RemoteError
+MessagingTimeout = messaging.MessagingTimeout
index 547026277a34192e05610c3797dea0049afce2bc..5d2aa6e1aaf038c4a4276cfbbe2691618dac80e6 100644 (file)
@@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
 from neutron.api.v2 import attributes
 from neutron.common import constants as l3_constants
 from neutron.common import exceptions as n_exc
+from neutron.common import rpc as n_rpc
 from neutron.common import utils
 from neutron.db import model_base
 from neutron.db import models_v2
@@ -28,7 +29,6 @@ from neutron.extensions import external_net
 from neutron.extensions import l3
 from neutron import manager
 from neutron.openstack.common import log as logging
-from neutron.openstack.common.notifier import api as notifier_api
 from neutron.openstack.common import uuidutils
 from neutron.plugins.common import constants
 
@@ -481,11 +481,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
                 'tenant_id': port['tenant_id'],
                 'port_id': port['id'],
                 'subnet_id': port['fixed_ips'][0]['subnet_id']}
-        notifier_api.notify(context,
-                            notifier_api.publisher_id('network'),
-                            'router.interface.create',
-                            notifier_api.CONF.default_notification_level,
-                            {'router_interface': info})
+        notifier = n_rpc.get_notifier('network')
+        notifier.info(
+            context, 'router.interface.create', {'router_interface': info})
         return info
 
     def _confirm_router_interface_not_in_use(self, context, router_id,
@@ -560,11 +558,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
                 'tenant_id': port['tenant_id'],
                 'port_id': port['id'],
                 'subnet_id': subnet['id']}
-        notifier_api.notify(context,
-                            notifier_api.publisher_id('network'),
-                            'router.interface.delete',
-                            notifier_api.CONF.default_notification_level,
-                            {'router_interface': info})
+        notifier = n_rpc.get_notifier('network')
+        notifier.info(
+            context, 'router.interface.delete', {'router_interface': info})
         return info
 
     def _get_floatingip(self, context, id):
index 82e7d3dd1f992790ee03ec8e7fe0516983eef606..b55a0cf4c7e291d055df26ae37f4d931593c2437 100644 (file)
@@ -15,7 +15,6 @@
 # under the License.
 
 from neutron.common import constants as consts
-from neutron.common import rpc as p_rpc
 from neutron.common import utils
 from neutron import manager
 from neutron.openstack.common import log as logging
@@ -32,7 +31,7 @@ class MeteringRpcCallbacks(object):
         self.meter_plugin = meter_plugin
 
     def create_rpc_dispatcher(self):
-        return p_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def get_sync_data_metering(self, context, **kwargs):
         l3_plugin = manager.NeutronManager.get_service_plugins().get(
index 79ae9bc5d0dcae51be3244e2a116dc575bd732a3..4575de4b475c286e8c7a853253e34c1fc2a5e8d6 100644 (file)
@@ -45,7 +45,9 @@ from neutron.openstack.common import systemd
 from neutron.openstack.common import threadgroup
 
 
-rpc = importutils.try_import('neutron.openstack.common.rpc')
+#rpc = importutils.try_import('neutron.openstack.common.rpc')
+# TODO(ihrachys): restore once oslo-rpc code is removed from the tree
+rpc = None
 CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
index a9c1e6653a74cc5b52ff1527f5177ae0002b19fb..6cdf5913b0707f67be738b1c3a6be0ee4f9ba80d 100644 (file)
@@ -36,7 +36,6 @@ from neutron import context as q_context
 from neutron.extensions import securitygroup as ext_sg
 from neutron.openstack.common import excutils
 from neutron.openstack.common import log
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.bigswitch import config as pl_config
 
 LOG = log.getLogger(__name__)
@@ -106,7 +105,7 @@ class RestProxyAgent(rpc_compat.RpcCallback,
         self.topic = topics.AGENT
         self.plugin_rpc = PluginApi(topics.PLUGIN)
         self.context = q_context.get_admin_context_without_session()
-        self.dispatcher = dispatcher.RpcDispatcher([self])
+        self.dispatcher = [self]
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
         self.connection = agent_rpc.create_consumers(self.dispatcher,
index 9249f5d6b0215d65ea15e47fe5ae7e37c3395721..712f02b3c30f6beaa4c5312e83eb651c77648910 100644 (file)
@@ -57,7 +57,6 @@ from neutron.api import extensions as neutron_extensions
 from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.common import constants as const
 from neutron.common import exceptions
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.common import utils
@@ -121,8 +120,7 @@ class RestProxyCallbacks(rpc_compat.RpcCallback,
     RPC_API_VERSION = '1.1'
 
     def create_rpc_dispatcher(self):
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     def get_port_from_device(self, device):
         port_id = re.sub(r"^tap", "", device)
index fc1d1ad5d79bfd9fb250ebb873d705ac9b89c482..5ec3fb40169d947bf6f231575da9923ab05e5970 100644 (file)
@@ -31,7 +31,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
 from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.common import utils
@@ -98,8 +97,7 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         """
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     @classmethod
     def get_port_from_device(cls, device):
index d3749f19dab819a3b39d51b36da411875edcff97..e5c701e7d24893d1a3f0efe3a7e13cb48e6cc4db 100644 (file)
@@ -28,7 +28,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
 from neutron.api.v2 import attributes
 from neutron.common import constants
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.common import utils
@@ -75,8 +74,7 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         """
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
 
 class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
index 34473562570d2ca76f4c3ec86d3a5301fff3de7c..f76f751f843dd2f856975d3e48d97eddc7951e92 100644 (file)
@@ -38,7 +38,6 @@ from neutron.common import topics
 from neutron import context
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.hyperv.agent import utils
 from neutron.plugins.hyperv.agent import utilsfactory
@@ -106,8 +105,7 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback,
                                                      consumers)
 
     def _create_rpc_dispatcher(self):
-        rpc_callback = HyperVSecurityCallbackMixin(self)
-        return dispatcher.RpcDispatcher([rpc_callback])
+        return [HyperVSecurityCallbackMixin(self)]
 
 
 class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
@@ -236,7 +234,7 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback):
             segmentation_id, port['admin_state_up'])
 
     def _create_rpc_dispatcher(self):
-        return dispatcher.RpcDispatcher([self])
+        return [self]
 
     def _get_vswitch_name(self, network_type, physical_network):
         if network_type != p_const.TYPE_LOCAL:
index dafc160e7590e7a28f7f3cee7e0b45f8f52003c6..e967286d58578315fe388f00fc31f27dd03ba5f9 100644 (file)
@@ -17,7 +17,6 @@
 # @author: Alessandro Pilotti, Cloudbase Solutions Srl
 
 from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.db import agents_db
 from neutron.db import dhcp_rpc_base
@@ -48,8 +47,7 @@ class HyperVRpcCallbacks(
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return q_rpc.PluginRpcDispatcher([self,
-                                         agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     def get_device_details(self, rpc_context, **kwargs):
         """Agent requests device details."""
index 1a5190d90e49d7d2c090a4ef2278abb732cf7be8..b1fa1e8b65aecb91216c9e65de479ee10c3a8502 100644 (file)
@@ -37,7 +37,6 @@ from neutron.common import utils as n_utils
 from neutron import context
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.ibm.common import config  # noqa
 from neutron.plugins.ibm.common import constants
 
@@ -156,7 +155,7 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback):
                                              "out-of-band")
 
     def create_rpc_dispatcher(self):
-        return dispatcher.RpcDispatcher([self])
+        return [self]
 
     def setup_integration_br(self, bridge_name, reset_br, out_of_band,
                              controller_ip=None):
index d3be17e5170a817137d200ea1ce4d069be2754e1..8a6615f2e4443115e45900af5ec09f5402ef6ff3 100644 (file)
@@ -23,7 +23,6 @@ from oslo.config import cfg
 
 from neutron.common import constants as n_const
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as n_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import agents_db
@@ -54,8 +53,7 @@ class SdnveRpcCallbacks():
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return n_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     def sdnve_info(self, rpc_context, **kwargs):
         '''Update new information.'''
index d586b2eb9e94d0554aff30575b7cf2f0ecb49d3d..5af3f674a37085f421791e958a4086901735f4a7 100755 (executable)
@@ -45,7 +45,6 @@ from neutron.common import utils as q_utils
 from neutron import context
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.linuxbridge.common import config  # noqa
 from neutron.plugins.linuxbridge.common import constants as lconst
@@ -816,7 +815,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return dispatcher.RpcDispatcher([self])
+        return [self]
 
 
 class LinuxBridgePluginApi(agent_rpc.PluginApi,
index 9af9a616d5622196a941bbcfc6fc7116d4250dd9..61089f63cfcd7f5ceaa043626cc23e7205c9cfef 100644 (file)
@@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
 from neutron.api.v2 import attributes
 from neutron.common import constants as q_const
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.common import utils
@@ -72,8 +71,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     @classmethod
     def get_port_from_device(cls, device):
index 3902278e46453aa9c6907fd077ea3bb5e9750aa0..4495dda01d298b9ce10632cab51f487b9c7f5ec2 100644 (file)
@@ -29,7 +29,6 @@ from sqlalchemy.orm import exc as sa_exc
 from neutron.api.v2 import attributes
 from neutron.common import constants
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as n_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import agents_db
@@ -189,8 +188,7 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         """
-        return n_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
 
 class MidonetPluginException(n_exc.NeutronException):
index ff4e6e7bce4c9130290f01e6a68b5b9a6d86a38d..e5068afb4c266716fa6e194cde0ef0475bebe2b4 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from oslo import messaging
+
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import agents_db
@@ -46,13 +47,15 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
     #   1.0 Initial version (from openvswitch/linuxbridge)
     #   1.1 Support Security Group RPC
 
+    # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due to
+    # inheritance problems
+    target = messaging.Target(version=RPC_API_VERSION)
+
     def __init__(self, notifier, type_manager):
         # REVISIT(kmestery): This depends on the first three super classes
         # not having their own __init__ functions. If an __init__() is added
         # to one, this could break. Fix this and add a unit test to cover this
         # test in H3.
-        # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due
-        # to inheritance problems
         super(RpcCallbacks, self).__init__(notifier, type_manager)
 
     def create_rpc_dispatcher(self):
@@ -61,8 +64,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     @classmethod
     def _device_to_port_id(cls, device):
index 90a97ce443f461bbe9a5abbbf51752953d6f0d97..94fd2b89a4278f8302cb4afac297f22ecea13c29 100644 (file)
@@ -35,7 +35,6 @@ from neutron.common import utils as q_utils
 from neutron import context
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.mlnx.agent import utils
 from neutron.plugins.mlnx.common import config  # noqa
@@ -218,7 +217,7 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback,
         or support more than one class as the target of rpc messages,
         override this method.
         """
-        return dispatcher.RpcDispatcher([self])
+        return [self]
 
 
 class MlnxEswitchPluginApi(agent_rpc.PluginApi,
index fff970c43a04a7c7f345873859af73774811c430..0eda5143687e662cae13c6454b9926ca27aeaec1 100644 (file)
@@ -17,7 +17,6 @@
 from oslo.config import cfg
 
 from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.db import agents_db
 from neutron.db import api as db_api
@@ -48,8 +47,7 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback,
         or support more than one class as the target of RPC messages,
         override this method.
         """
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     @classmethod
     def get_port_from_device(cls, device):
index 38b13b5b7c377a1a59c4f4286a1ad3ddf92bb138..c1f580ac2e0384a57705c8e3ca196c7693be47db 100755 (executable)
@@ -38,7 +38,6 @@ from neutron import context as q_context
 from neutron.extensions import securitygroup as ext_sg
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.nec.common import config
 
 
@@ -157,8 +156,7 @@ class NECNeutronAgent(object):
                                                 self, self.sg_agent)
         self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
                                                          self.sg_agent)
-        self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
-                                                    self.callback_sg])
+        self.dispatcher = [self.callback_nec, self.callback_sg]
         # Define the listening consumer for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
index ed35dcb17fd4e1a3bec83998d406f2193917b91c..70f4a1a63d1d529415909130afb552b5e17bcba7 100644 (file)
@@ -18,7 +18,6 @@
 from oslo.config import cfg
 
 from neutron.agent.common import config
-from neutron.openstack.common import rpc  # noqa
 from neutron.plugins.nec.common import constants as nconst
 
 
index e36f9d63eea46d8f43bedcef32ee8dfa971069c6..2bea5c04effa059f9f9df1bc0fcd00eb419f8184 100644 (file)
@@ -22,7 +22,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.api.v2 import attributes as attrs
 from neutron.common import constants as const
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import agents_db
@@ -147,12 +146,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 
         # NOTE: callback_sg is referred to from the sg unit test.
         self.callback_sg = SecurityGroupServerRpcCallback()
-        callbacks = [NECPluginV2RPCCallbacks(self.safe_reference),
-                     DhcpRpcCallback(),
-                     L3RpcCallback(),
-                     self.callback_sg,
-                     agents_db.AgentExtRpcCallback()]
-        self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
+        self.dispatcher = [
+            NECPluginV2RPCCallbacks(self.safe_reference),
+            DhcpRpcCallback(),
+            L3RpcCallback(),
+            self.callback_sg,
+            agents_db.AgentExtRpcCallback()]
         for svc_topic in self.service_topics.values():
             self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
         # Consume from all consumers in a thread
@@ -722,7 +721,7 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback):
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return q_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def update_ports(self, rpc_context, **kwargs):
         """Update ports' information and activate/deavtivate them.
index 7ff3040b067e7ea6f0caccf26388af4c49e8b4aa..c79d77a9154fa82e95abaefc4b64e22b019edb89 100644 (file)
@@ -39,7 +39,6 @@ from neutron.common import utils as n_utils
 from neutron import context
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ofagent.common import config  # noqa
 from neutron.plugins.openvswitch.common import constants
@@ -351,7 +350,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         """
-        return dispatcher.RpcDispatcher([self])
+        return [self]
 
     def _provision_local_vlan_outbound_for_tunnel(self, lvid,
                                                   segmentation_id, ofports):
index d1d3daf6efdf941a52bdae4f21ce47ce599b478b..0ef6348dfb428435dc3ea700ceffbbbd25bcc53e 100644 (file)
@@ -32,7 +32,6 @@ from neutron.common import topics
 from neutron import context as n_context
 from neutron.extensions import securitygroup as ext_sg
 from neutron.openstack.common import log as logging
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.oneconvergence.lib import config
 
 LOG = logging.getLogger(__name__)
@@ -120,8 +119,7 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback):
                                                 self, self.sg_agent)
         self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
                                                          self.sg_agent)
-        self.dispatcher = dispatcher.RpcDispatcher([self.callback_oc,
-                                                    self.callback_sg])
+        self.dispatcher = [self.callback_oc, self.callback_sg]
         # Define the listening consumer for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
index 7d7af13b0d0247a143f1901e2a803ce953e52042..732ead70ae631a2f6f5b3fcb23c4e5905a83f8f1 100644 (file)
@@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
 from neutron.common import constants as q_const
 from neutron.common import exceptions as nexception
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import agents_db
@@ -61,8 +60,7 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback,
 
     def create_rpc_dispatcher(self):
         """Get the rpc dispatcher for this manager."""
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     @staticmethod
     def get_port_from_device(device):
index e6a58567a412dfc00b3e961a688ddc4925101e93..31c627484802fa5c67c1e5db26369232bbef7843 100644 (file)
@@ -41,7 +41,6 @@ from neutron.common import utils as q_utils
 from neutron import context
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.openvswitch.common import config  # noqa
 from neutron.plugins.openvswitch.common import constants
@@ -500,7 +499,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return dispatcher.RpcDispatcher([self])
+        return [self]
 
     def provision_local_vlan(self, net_uuid, network_type, physical_network,
                              segmentation_id):
index 01867c4164c62014bf07911f3698d2ca8f1bc679..5e3f387b0f4f68ef9ba723d2e3029f774dbffcf2 100644 (file)
@@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
 from neutron.api.v2 import attributes
 from neutron.common import constants as q_const
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.common import utils
@@ -80,8 +79,7 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return q_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
     @classmethod
     def get_port_from_device(cls, device):
index 746a0c2f54cd131a337863c59db8c9ba043dd060..6086113c7f59cd6c8e169e1b2825fcd83cc3fffe 100755 (executable)
@@ -42,7 +42,6 @@ from neutron.common import topics
 from neutron import context as q_context
 from neutron.extensions import securitygroup as ext_sg
 from neutron.openstack.common import log
-from neutron.openstack.common.rpc import dispatcher
 from neutron.plugins.ryu.common import config  # noqa
 
 
@@ -209,7 +208,7 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback,
                                                      consumers)
 
     def _create_rpc_dispatcher(self):
-        return dispatcher.RpcDispatcher([self])
+        return [self]
 
     def _setup_integration_br(self, root_helper, integ_br,
                               tunnel_ip, ovsdb_port, ovsdb_ip):
index 35065a41e57eab0ba49ddd66d6e03441c8a79298..787ccb21c745141efa3f35542bd562ae353f5f3c 100644 (file)
@@ -23,7 +23,6 @@ from ryu.app import rest_nw_id
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.common import constants as q_const
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import api as db
@@ -59,7 +58,7 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback,
         self.ofp_rest_api_addr = ofp_rest_api_addr
 
     def create_rpc_dispatcher(self):
-        return q_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def get_ofp_rest_api(self, context, **kwargs):
         LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
index 057e94d97018008a80b4af895b53774c6061f25b..c32a39b37246b053da8bf78fb0be883cd1899afd 100644 (file)
@@ -24,7 +24,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.api.v2 import attributes
 from neutron.common import constants as const
 from neutron.common import exceptions as ntn_exc
-from neutron.common import rpc as n_rpc
 from neutron.common import rpc_compat
 from neutron.db import agents_db
 from neutron.db import db_base_plugin_v2
@@ -55,8 +54,7 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return n_rpc.PluginRpcDispatcher([self,
-                                          agents_db.AgentExtRpcCallback()])
+        return [self, agents_db.AgentExtRpcCallback()]
 
 
 def handle_network_dhcp_access(plugin, context, network, action):
index 4c64432b6f28b4b073717edca72da2c8ab382118..747638287f2e68eae34985510b2f5e2f6682101d 100644 (file)
@@ -26,7 +26,6 @@ from oslo.config import cfg
 from neutron.api.v2 import attributes
 from neutron.common import exceptions
 import neutron.common.utils as utils
-from neutron import manager
 from neutron.openstack.common import excutils
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
@@ -263,6 +262,9 @@ class OwnerCheck(policy.Check):
             # resource is handled by the core plugin. It might be worth
             # having a way to map resources to plugins so to make this
             # check more general
+            # FIXME(ihrachys): if import is put in global, circular
+            # import failure occurs
+            from neutron import manager
             f = getattr(manager.NeutronManager.get_instance().plugin,
                         'get_%s' % parent_res)
             # f *must* exist, if not found it is better to let neutron
index 9b3073b5fb4ded9990a7fa9228304c9ab17b7d9c..f14021769e85b1bbdb72084e6716ddf915493d22 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import eventlet
 import inspect
 import logging as std_logging
 import os
 import random
 
 from oslo.config import cfg
+from oslo.messaging import server as rpc_server
 
 from neutron.common import config
 from neutron.common import rpc_compat
@@ -112,23 +112,25 @@ class RpcWorker(object):
     """Wraps a worker to be handled by ProcessLauncher"""
     def __init__(self, plugin):
         self._plugin = plugin
-        self._server = None
+        self._servers = []
 
     def start(self):
         # We may have just forked from parent process.  A quick disposal of the
         # existing sql connections avoids producing errors later when they are
         # discovered to be broken.
         session.get_engine().pool.dispose()
-        self._server = self._plugin.start_rpc_listener()
+        self._servers = self._plugin.start_rpc_listener()
 
     def wait(self):
-        if isinstance(self._server, eventlet.greenthread.GreenThread):
-            self._server.wait()
+        for server in self._servers:
+            if isinstance(server, rpc_server.MessageHandlingServer):
+                server.wait()
 
     def stop(self):
-        if isinstance(self._server, eventlet.greenthread.GreenThread):
-            self._server.kill()
-            self._server = None
+        for server in self._servers:
+            if isinstance(server, rpc_server.MessageHandlingServer):
+                server.kill()
+            self._servers = []
 
 
 def serve_rpc():
index fd2131e219c4af926e99ab42aeeb200206c540d9..0238902f3beb7cdac63fc8879b616046c87170f5 100644 (file)
@@ -20,7 +20,6 @@
 from oslo.config import cfg
 
 from neutron.common import exceptions as n_exception
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron import context as neutron_context
@@ -42,7 +41,7 @@ class FirewallCallbacks(rpc_compat.RpcCallback):
         self.plugin = plugin
 
     def create_rpc_dispatcher(self):
-        return q_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def set_firewall_status(self, context, firewall_id, status, **kwargs):
         """Agent uses this to set a firewall's status."""
index c5505817d59449e6d85719338fa57f10a22fd5af..29950c984d8d410a9890d93bef7cfcbb0e4c75e6 100644 (file)
@@ -21,7 +21,6 @@ from oslo.config import cfg
 
 from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
 from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import api as qdbapi
@@ -46,7 +45,7 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         """
-        return q_rpc.PluginRpcDispatcher([self])
+        return [self]
 
 
 class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
index 8436cb835459609dbed7db68d304555ee7da98a0..85be0bacd0ebc88d76f87201bcf020012a41cf04 100644 (file)
@@ -22,7 +22,6 @@ from oslo.config import cfg
 
 from neutron.common import constants as q_const
 from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.db import agents_db
@@ -66,8 +65,7 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback):
         self.plugin = plugin
 
     def create_rpc_dispatcher(self):
-        return q_rpc.PluginRpcDispatcher(
-            [self, agents_db.AgentExtRpcCallback(self.plugin)])
+        return [self, agents_db.AgentExtRpcCallback(self.plugin)]
 
     def get_ready_devices(self, context, host=None):
         with context.session.begin(subtransactions=True):
index ba1fe6bac231e78a55cb4b0dce1ad213ed4e9dbd..80883f41b377057f125258d6a3ff931739885a4c 100644 (file)
@@ -26,6 +26,7 @@ from neutron.agent.common import config
 from neutron.agent import rpc as agent_rpc
 from neutron.common import config as common_config
 from neutron.common import constants as constants
+from neutron.common import rpc as n_rpc
 from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.common import utils
@@ -34,7 +35,6 @@ from neutron import manager
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.notifier import api as notifier_api
 from neutron.openstack.common import periodic_task
 from neutron.openstack.common import service
 from neutron import service as neutron_service
@@ -114,11 +114,8 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager):
                     'host': self.host}
 
             LOG.debug(_("Send metering report: %s"), data)
-            notifier_api.notify(self.context,
-                                notifier_api.publisher_id('metering'),
-                                'l3.meter',
-                                notifier_api.CONF.default_notification_level,
-                                data)
+            notifier = n_rpc.get_notifier('metering')
+            notifier.info(self.context, 'l3.meter', data)
             info['pkts'] = 0
             info['bytes'] = 0
             info['time'] = 0
index ba19460d772f278fe7a8649e80d9a81358923452..12904f23e342854d5528396acb963983dadbb1ce 100644 (file)
@@ -20,10 +20,10 @@ import requests
 
 import netaddr
 from oslo.config import cfg
+from oslo import messaging
 import six
 
 from neutron.common import exceptions
-from neutron.common import rpc as n_rpc
 from neutron.common import rpc_compat
 from neutron import context as ctx
 from neutron.openstack.common import lockutils
@@ -184,12 +184,13 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
 
     # history
     #   1.0 Initial version
-
     RPC_API_VERSION = '1.0'
 
+    # TODO(ihrachys): we can't use RpcCallback here due to inheritance
+    # issues
+    target = messaging.Target(version=RPC_API_VERSION)
+
     def __init__(self, agent, host):
-        # TODO(ihrachys): we can't use RpcCallback here due to
-        # inheritance issues
         self.host = host
         self.conn = rpc_compat.create_connection(new=True)
         context = ctx.get_admin_context_without_session()
@@ -225,7 +226,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
                           for k, v in csrs_found.items()])
 
     def create_rpc_dispatcher(self):
-        return n_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def vpnservice_updated(self, context, **kwargs):
         """Handle VPNaaS service driver change notifications."""
index 0d9ded9604542ef18ecc3a59fbfd546180ea9c89..2480eb27279776b306e5beff131c70f48dde4606 100644 (file)
@@ -23,11 +23,11 @@ import shutil
 import jinja2
 import netaddr
 from oslo.config import cfg
+from oslo import messaging
 import six
 
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
-from neutron.common import rpc as q_rpc
 from neutron.common import rpc_compat
 from neutron import context
 from neutron.openstack.common import lockutils
@@ -487,9 +487,11 @@ class IPsecDriver(device_drivers.DeviceDriver):
 
     RPC_API_VERSION = '1.0'
 
+    # TODO(ihrachys): we can't use RpcCallback here due to inheritance
+    # issues
+    target = messaging.Target(version=RPC_API_VERSION)
+
     def __init__(self, agent, host):
-        # TODO(ihrachys): we can't use RpcCallback here due to
-        # inheritance issues
         self.agent = agent
         self.conf = self.agent.conf
         self.root_helper = self.agent.root_helper
@@ -514,7 +516,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
             interval=self.conf.ipsec.ipsec_status_check_interval)
 
     def create_rpc_dispatcher(self):
-        return q_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def _update_nat(self, vpnservice, func):
         """Setting up nat rule in iptables.
index 85657231977e0dfd18795ba652cb7208fe6c667b..c2b39da9e353e60c7a270a67694a12fbebbdcd91 100644 (file)
@@ -16,7 +16,6 @@ import netaddr
 from netaddr import core as net_exc
 
 from neutron.common import exceptions
-from neutron.common import rpc as n_rpc
 from neutron.common import rpc_compat
 from neutron.openstack.common import excutils
 from neutron.openstack.common import log as logging
@@ -55,7 +54,7 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback):
         self.driver = driver
 
     def create_rpc_dispatcher(self):
-        return n_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def get_vpn_services_on_host(self, context, host=None):
         """Retuns info on the vpnservices on the host."""
index cf4b055d89b05fbeb215dc0707464d46c077df15..13b7c171b4150246b83f2ed436ff4ce85ba95e02 100644 (file)
@@ -16,7 +16,6 @@
 #    under the License.
 import netaddr
 
-from neutron.common import rpc as n_rpc
 from neutron.common import rpc_compat
 from neutron.openstack.common import log as logging
 from neutron.services.vpn.common import topics
@@ -42,7 +41,7 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback):
         self.driver = driver
 
     def create_rpc_dispatcher(self):
-        return n_rpc.PluginRpcDispatcher([self])
+        return [self]
 
     def get_vpn_services_on_host(self, context, host=None):
         """Returns the vpnservices on the host."""
index 87412f92444445cc5160bda75ca2786a1e00eba7..95034f65381d01b2006f0b62c0e161cd8a0a9700 100644 (file)
@@ -29,15 +29,14 @@ import eventlet.timeout
 import fixtures
 import mock
 from oslo.config import cfg
+from oslo.messaging import conffixture as messaging_conffixture
 import testtools
 
 from neutron.common import config
+from neutron.common import rpc as n_rpc
 from neutron.db import agentschedulers_db
 from neutron import manager
-from neutron.openstack.common.notifier import api as notifier_api
-from neutron.openstack.common.notifier import test_notifier
-from neutron.openstack.common import rpc
-from neutron.openstack.common.rpc import impl_fake
+from neutron.tests import fake_notifier
 from neutron.tests import post_mortem_debug
 
 
@@ -58,6 +57,10 @@ def fake_use_fatal_exceptions(*args):
     return True
 
 
+def fake_consume_in_threads(self):
+    return []
+
+
 class BaseTestCase(testtools.TestCase):
 
     def cleanup_core_plugin(self):
@@ -90,16 +93,10 @@ class BaseTestCase(testtools.TestCase):
         if core_plugin is not None:
             cfg.CONF.set_override('core_plugin', core_plugin)
 
-    def _cleanup_test_notifier(self):
-        test_notifier.NOTIFICATIONS = []
-
     def setup_notification_driver(self, notification_driver=None):
-        # to reload the drivers
-        self.addCleanup(notifier_api._reset_drivers)
-        self.addCleanup(self._cleanup_test_notifier)
-        notifier_api._reset_drivers()
+        self.addCleanup(fake_notifier.reset)
         if notification_driver is None:
-            notification_driver = [test_notifier.__name__]
+            notification_driver = [fake_notifier.__name__]
         cfg.CONF.set_override("notification_driver", notification_driver)
 
     @staticmethod
@@ -113,10 +110,6 @@ class BaseTestCase(testtools.TestCase):
         else:
             conf(args)
 
-    def _cleanup_rpc_backend(self):
-        rpc._RPCIMPL = None
-        impl_fake.CONSUMERS.clear()
-
     def setUp(self):
         super(BaseTestCase, self).setUp()
 
@@ -124,8 +117,6 @@ class BaseTestCase(testtools.TestCase):
         # test-specific cleanup has a chance to release references.
         self.addCleanup(self.cleanup_core_plugin)
 
-        self.addCleanup(self._cleanup_rpc_backend)
-
         # Configure this first to ensure pm debugging support for setUp()
         if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING:
             self.addOnException(post_mortem_debug.exception_handler)
@@ -179,6 +170,25 @@ class BaseTestCase(testtools.TestCase):
             'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
             fake_use_fatal_exceptions))
 
+        # don't actually start RPC listeners when testing
+        self.useFixture(fixtures.MonkeyPatch(
+            'neutron.common.rpc_compat.Connection.consume_in_thread',
+            fake_consume_in_threads))
+
+        self.useFixture(fixtures.MonkeyPatch(
+            'oslo.messaging.Notifier', fake_notifier.FakeNotifier))
+
+        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
+        self.messaging_conf.transport_driver = 'fake'
+        self.messaging_conf.response_timeout = 15
+        self.useFixture(self.messaging_conf)
+
+        self.addCleanup(n_rpc.clear_extra_exmods)
+        n_rpc.add_extra_exmods('neutron.test')
+
+        self.addCleanup(n_rpc.cleanup)
+        n_rpc.init(CONF)
+
         if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml':
             raise self.skipException('XML Testing Skipped in Py26')
 
diff --git a/neutron/tests/fake_notifier.py b/neutron/tests/fake_notifier.py
new file mode 100644 (file)
index 0000000..012f335
--- /dev/null
@@ -0,0 +1,50 @@
+# Copyright 2014 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import functools
+
+
+NOTIFICATIONS = []
+
+
+def reset():
+    del NOTIFICATIONS[:]
+
+
+FakeMessage = collections.namedtuple('Message',
+                                     ['publisher_id', 'priority',
+                                      'event_type', 'payload'])
+
+
+class FakeNotifier(object):
+
+    def __init__(self, transport, publisher_id=None):
+        self.transport = transport
+        self.publisher_id = publisher_id
+        for priority in ('debug', 'info', 'warn', 'error', 'critical'):
+            setattr(self, priority,
+                    functools.partial(self._notify, priority=priority.upper()))
+
+    def prepare(self, publisher_id=None):
+        if publisher_id is None:
+            publisher_id = self.publisher_id
+        return self.__class__(self.transport, publisher_id)
+
+    def _notify(self, ctxt, event_type, payload, priority):
+        msg = dict(publisher_id=self.publisher_id,
+                   priority=priority,
+                   event_type=event_type,
+                   payload=payload)
+        NOTIFICATIONS.append(msg)
index 4af19fc546ba4153d4043ec17eaeb497305d1afb..965842738b6dc8cd32982db371b96fdbee1302e4 100644 (file)
@@ -23,9 +23,9 @@ Unit Tests for hyperv neutron rpc
 import mock
 
 from neutron.agent import rpc as agent_rpc
+from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.openstack.common import context
-from neutron.openstack.common import rpc
 from neutron.plugins.hyperv import agent_notifier_api as ana
 from neutron.plugins.hyperv.common import constants
 from neutron.tests import base
@@ -38,19 +38,19 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
         ctxt = context.RequestContext('fake_user', 'fake_project')
         expected_retval = 'foo' if method == 'call' else None
         expected_msg = rpcapi.make_msg(method, **kwargs)
-        expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
         if rpc_method == 'cast' and method == 'run_instance':
             kwargs['call'] = False
 
-        with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
+        proxy = rpc_compat.RpcProxy
+        with mock.patch.object(proxy, rpc_method) as rpc_method_mock:
             rpc_method_mock.return_value = expected_retval
             retval = getattr(rpcapi, method)(ctxt, **kwargs)
 
         self.assertEqual(retval, expected_retval)
-        expected_args = [ctxt, topic, expected_msg]
-        for arg, expected_arg in zip(rpc_method_mock.call_args[0],
-                                     expected_args):
-            self.assertEqual(arg, expected_arg)
+        expected = [
+            mock.call(ctxt, expected_msg, topic=topic)
+        ]
+        rpc_method_mock.assert_has_calls(expected)
 
     def test_delete_network(self):
         rpcapi = ana.AgentNotifierApi(topics.AGENT)
index 762a65be1d36db6a07de32e696092a6aa8dbbd81..616a06acd9ed9943bfaf6ade083362851e4df861 100644 (file)
@@ -35,7 +35,6 @@ class rpcApiTestCase(base.BaseTestCase):
         expected_retval = 'foo' if method == 'call' else None
         if not expected_msg:
             expected_msg = rpcapi.make_msg(method, **kwargs)
-        expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
         if rpc_method == 'cast' and method == 'run_instance':
             kwargs['call'] = False
 
@@ -49,15 +48,19 @@ class rpcApiTestCase(base.BaseTestCase):
                 return expected_retval
 
         self.useFixture(fixtures.MonkeyPatch(
-            'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
+            'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
+            _fake_rpc_method))
 
         retval = getattr(rpcapi, method)(ctxt, **kwargs)
 
         self.assertEqual(expected_retval, retval)
-        expected_args = [ctxt, topic, expected_msg]
+        expected_args = [ctxt, expected_msg]
+        expected_kwargs = {'topic': topic}
 
-        for arg, expected_arg in zip(self.fake_args, expected_args):
+        # skip the first argument which is 'self'
+        for arg, expected_arg in zip(self.fake_args[1:], expected_args):
             self.assertEqual(expected_arg, arg)
+        self.assertEqual(expected_kwargs, self.fake_kwargs)
 
     def test_delete_network(self):
         rpcapi = plb.AgentNotifierApi(topics.AGENT)
index a2d3bf0eb5a17c04cbaeeee20db2407287fcd2a2..af48a74f171c53ad08727c3b195ed08d196fb7a2 100644 (file)
@@ -20,9 +20,9 @@ Unit Tests for ml2 rpc
 import mock
 
 from neutron.agent import rpc as agent_rpc
+from neutron.common import rpc_compat
 from neutron.common import topics
 from neutron.openstack.common import context
-from neutron.openstack.common import rpc
 from neutron.plugins.ml2.drivers import type_tunnel
 from neutron.plugins.ml2 import rpc as plugin_rpc
 from neutron.tests import base
@@ -34,20 +34,19 @@ class RpcApiTestCase(base.BaseTestCase):
         ctxt = context.RequestContext('fake_user', 'fake_project')
         expected_retval = 'foo' if method == 'call' else None
         expected_msg = rpcapi.make_msg(method, **kwargs)
-        expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
         if rpc_method == 'cast' and method == 'run_instance':
             kwargs['call'] = False
 
+        rpc = rpc_compat.RpcProxy
         with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
             rpc_method_mock.return_value = expected_retval
             retval = getattr(rpcapi, method)(ctxt, **kwargs)
 
         self.assertEqual(retval, expected_retval)
-
-        expected_args = [ctxt, topic, expected_msg]
-        for arg, expected_arg in zip(rpc_method_mock.call_args[0],
-                                     expected_args):
-            self.assertEqual(arg, expected_arg)
+        expected = [
+            mock.call(ctxt, expected_msg, topic=topic)
+        ]
+        rpc_method_mock.assert_has_calls(expected)
 
     def test_delete_network(self):
         rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
index 80dcf78277fc4625371563134ea6308bd5efc0d2..ea34a840b1a61aed03c59e96c2c61400f1c3152c 100644 (file)
@@ -37,7 +37,6 @@ class rpcApiTestCase(base.BaseTestCase):
         expected_retval = 'foo' if method == 'call' else None
         if not expected_msg:
             expected_msg = rpcapi.make_msg(method, **kwargs)
-        expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
         if rpc_method == 'cast' and method == 'run_instance':
             kwargs['call'] = False
 
@@ -51,15 +50,19 @@ class rpcApiTestCase(base.BaseTestCase):
                 return expected_retval
 
         self.useFixture(fixtures.MonkeyPatch(
-            'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
+            'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
+            _fake_rpc_method))
 
         retval = getattr(rpcapi, method)(ctxt, **kwargs)
 
         self.assertEqual(expected_retval, retval)
-        expected_args = [ctxt, topic, expected_msg]
+        expected_args = [ctxt, expected_msg]
+        expected_kwargs = {'topic': topic}
 
-        for arg, expected_arg in zip(self.fake_args, expected_args):
+        # skip the first argument which is 'self'
+        for arg, expected_arg in zip(self.fake_args[1:], expected_args):
             self.assertEqual(expected_arg, arg)
+        self.assertEqual(expected_kwargs, self.fake_kwargs)
 
     def test_delete_network(self):
         rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
index 1b6a7370a7b14075a2ceee5f8fbbf69144e90b74..e8f75b9f4c13a9d8e97cbd12f8909306138d8458 100644 (file)
@@ -34,7 +34,6 @@ class rpcApiTestCase(base.BaseTestCase):
         ctxt = context.RequestContext('fake_user', 'fake_project')
         expected_retval = 'foo' if method == 'call' else None
         expected_msg = rpcapi.make_msg(method, **kwargs)
-        expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
         if rpc_method == 'cast' and method == 'run_instance':
             kwargs['call'] = False
 
@@ -48,15 +47,19 @@ class rpcApiTestCase(base.BaseTestCase):
                 return expected_retval
 
         self.useFixture(fixtures.MonkeyPatch(
-            'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
+            'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
+            _fake_rpc_method))
 
         retval = getattr(rpcapi, method)(ctxt, **kwargs)
 
         self.assertEqual(retval, expected_retval)
-        expected_args = [ctxt, topic, expected_msg]
+        expected_args = [ctxt, expected_msg]
+        expected_kwargs = {'topic': topic}
 
-        for arg, expected_arg in zip(self.fake_args, expected_args):
+        # skip the first argument which is 'self'
+        for arg, expected_arg in zip(self.fake_args[1:], expected_args):
             self.assertEqual(arg, expected_arg)
+        self.assertEqual(expected_kwargs, self.fake_kwargs)
 
     def test_delete_network(self):
         rpcapi = povs.AgentNotifierApi(topics.AGENT)
index 3e1d0db299eb84fbf714f2a01e8f25c80e41a685..b3e3511fea985abfd92338d3dfebaea844f31c41 100644 (file)
@@ -18,10 +18,10 @@ import mock
 from oslo.config import cfg
 
 from neutron.agent.common import config
-from neutron.openstack.common.notifier import test_notifier
 from neutron.openstack.common import uuidutils
 from neutron.services.metering.agents import metering_agent
 from neutron.tests import base
+from neutron.tests import fake_notifier
 
 
 _uuid = uuidutils.generate_uuid
@@ -96,8 +96,8 @@ class TestMeteringOperations(base.BaseTestCase):
                                                           'bytes': 444}}
         self.agent._metering_loop()
 
-        self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0)
-        for n in test_notifier.NOTIFICATIONS:
+        self.assertNotEqual(len(fake_notifier.NOTIFICATIONS), 0)
+        for n in fake_notifier.NOTIFICATIONS:
             if n['event_type'] == 'l3.meter':
                 break
 
index bc4ae4a17805d34b4463a242a8f99468314d5a14..569a739566611b764e4dce30bab3530c410b9248 100644 (file)
@@ -27,7 +27,7 @@ class AgentRPCPluginApi(base.BaseTestCase):
         agent = rpc.PluginApi('fake_topic')
         ctxt = context.RequestContext('fake_user', 'fake_project')
         expect_val = 'foo'
-        with mock.patch('neutron.openstack.common.rpc.call') as rpc_call:
+        with mock.patch('neutron.common.rpc_compat.RpcProxy.call') as rpc_call:
             rpc_call.return_value = expect_val
             func_obj = getattr(agent, method)
             if method == 'tunnel_sync':
index c09dd21ee90cbf3fea7e81180b421eb6c32980dc..38d54f7cace06dd8103b8c3507aa9d2395b73f83 100644 (file)
@@ -33,12 +33,12 @@ from neutron.api.v2 import router
 from neutron.common import exceptions as n_exc
 from neutron import context
 from neutron import manager
-from neutron.openstack.common.notifier import api as notifer_api
 from neutron.openstack.common import policy as common_policy
 from neutron.openstack.common import uuidutils
 from neutron import policy
 from neutron import quota
 from neutron.tests import base
+from neutron.tests import fake_notifier
 from neutron.tests.unit import testlib_api
 
 
@@ -1242,41 +1242,42 @@ class V2Views(base.BaseTestCase):
 
 
 class NotificationTest(APIv2TestBase):
-    def _resource_op_notifier(self, opname, resource, expected_errors=False,
-                              notification_level='INFO'):
+
+    def setUp(self):
+        super(NotificationTest, self).setUp()
+        fake_notifier.reset()
+
+    def _resource_op_notifier(self, opname, resource, expected_errors=False):
         initial_input = {resource: {'name': 'myname'}}
         instance = self.plugin.return_value
         instance.get_networks.return_value = initial_input
         instance.get_networks_count.return_value = 0
         expected_code = exc.HTTPCreated.code
-        with mock.patch.object(notifer_api, 'notify') as mynotifier:
-            if opname == 'create':
-                initial_input[resource]['tenant_id'] = _uuid()
-                res = self.api.post_json(
-                    _get_path('networks'),
-                    initial_input, expect_errors=expected_errors)
-            if opname == 'update':
-                res = self.api.put_json(
-                    _get_path('networks', id=_uuid()),
-                    initial_input, expect_errors=expected_errors)
-                expected_code = exc.HTTPOk.code
-            if opname == 'delete':
-                initial_input[resource]['tenant_id'] = _uuid()
-                res = self.api.delete(
-                    _get_path('networks', id=_uuid()),
-                    expect_errors=expected_errors)
-                expected_code = exc.HTTPNoContent.code
-            expected = [mock.call(mock.ANY,
-                                  'network.' + cfg.CONF.host,
-                                  resource + "." + opname + ".start",
-                                  notification_level,
-                                  mock.ANY),
-                        mock.call(mock.ANY,
-                                  'network.' + cfg.CONF.host,
-                                  resource + "." + opname + ".end",
-                                  notification_level,
-                                  mock.ANY)]
-            self.assertEqual(expected, mynotifier.call_args_list)
+        if opname == 'create':
+            initial_input[resource]['tenant_id'] = _uuid()
+            res = self.api.post_json(
+                _get_path('networks'),
+                initial_input, expect_errors=expected_errors)
+        if opname == 'update':
+            res = self.api.put_json(
+                _get_path('networks', id=_uuid()),
+                initial_input, expect_errors=expected_errors)
+            expected_code = exc.HTTPOk.code
+        if opname == 'delete':
+            initial_input[resource]['tenant_id'] = _uuid()
+            res = self.api.delete(
+                _get_path('networks', id=_uuid()),
+                expect_errors=expected_errors)
+            expected_code = exc.HTTPNoContent.code
+
+        expected_events = ('.'.join([resource, opname, "start"]),
+                           '.'.join([resource, opname, "end"]))
+        self.assertEqual(len(fake_notifier.NOTIFICATIONS),
+                         len(expected_events))
+        for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events):
+            self.assertEqual('INFO', msg['priority'])
+            self.assertEqual(event, msg['event_type'])
+
         self.assertEqual(res.status_int, expected_code)
 
     def test_network_create_notifer(self):
@@ -1288,11 +1289,6 @@ class NotificationTest(APIv2TestBase):
     def test_network_update_notifer(self):
         self._resource_op_notifier('update', 'network')
 
-    def test_network_create_notifer_with_log_level(self):
-        cfg.CONF.set_override('default_notification_level', 'DEBUG')
-        self._resource_op_notifier('create', 'network',
-                                   notification_level='DEBUG')
-
 
 class DHCPNotificationTest(APIv2TestBase):
     def _test_dhcp_notifier(self, opname, resource, initial_input=None):
index b02ba15f8743af61e11b84e3ab840ac7580be840..4eb80d0d33dfb3d6941b29e834804ddf8fed5632 100644 (file)
@@ -38,9 +38,9 @@ from neutron.extensions import l3
 from neutron import manager
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
-from neutron.openstack.common.notifier import test_notifier
 from neutron.openstack.common import uuidutils
 from neutron.plugins.common import constants as service_constants
+from neutron.tests import fake_notifier
 from neutron.tests.unit import test_agent_ext_plugin
 from neutron.tests.unit import test_api_v2
 from neutron.tests.unit import test_api_v2_extension
@@ -660,7 +660,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
                              'subnet.create.end',
                              'router.interface.create',
                              'router.interface.delete']
-        test_notifier.NOTIFICATIONS = []
+        fake_notifier.reset()
         with self.router() as r:
             with self.subnet() as s:
                 body = self._router_interface_action('add',
@@ -683,9 +683,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
 
                 self.assertEqual(
                     set(exp_notifications),
-                    set(n['event_type'] for n in test_notifier.NOTIFICATIONS))
+                    set(n['event_type'] for n in fake_notifier.NOTIFICATIONS))
 
-                for n in test_notifier.NOTIFICATIONS:
+                for n in fake_notifier.NOTIFICATIONS:
                     if n['event_type'].startswith('router.interface.'):
                         payload = n['payload']['router_interface']
                         self.assertIn('id', payload)
index 5ba04f255f5e3115c70094f887335bc572e912d1..f34177ab21a39aaae4fc85a7859e9766a8488959 100644 (file)
@@ -23,6 +23,7 @@ alembic>=0.4.1
 six>=1.7.0
 stevedore>=0.14
 oslo.config>=1.2.1
+oslo.messaging>=1.3.0
 oslo.rootwrap
 
 python-novaclient>=2.17.0
index cc4db51b9c44f9d6ca48145016d20b9dd62b1360..0eaaaed0f668c1ac7ed886fe750b60e7b76e44b2 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -169,6 +169,14 @@ neutron.ml2.mechanism_drivers =
     fslsdn = neutron.plugins.ml2.drivers.mechanism_fslsdn:FslsdnMechanismDriver
 neutron.openstack.common.cache.backends =
     memory = neutron.openstack.common.cache._backends.memory:MemoryBackend
+# These are for backwards compat with Icehouse notification_driver configuration values
+oslo.messaging.notify.drivers =
+    neutron.openstack.common.notifier.log_notifier = oslo.messaging.notify._impl_log:LogDriver
+    neutron.openstack.common.notifier.no_op_notifier = oslo.messaging.notify._impl_noop:NoOpDriver
+    neutron.openstack.common.notifier.rpc_notifier2 = oslo.messaging.notify._impl_messaging:MessagingV2Driver
+    neutron.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver
+    neutron.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver
+
 
 [build_sphinx]
 all_files = 1