from neutron.api.v2 import resource as wsgi_resource
from neutron.common import constants as const
from neutron.common import exceptions
+from neutron.common import rpc as n_rpc
from neutron.openstack.common import log as logging
-from neutron.openstack.common.notifier import api as notifier_api
from neutron import policy
from neutron import quota
self._native_sorting = self._is_native_sorting_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')]
- self._publisher_id = notifier_api.publisher_id('network')
+ self._notifier = n_rpc.get_notifier('network')
# use plugin's dhcp notifier, if this is already instantiated
agent_notifiers = getattr(plugin, 'agent_notifiers', {})
self._dhcp_agent_notifier = (
def create(self, request, body=None, **kwargs):
"""Creates a new instance of the requested entity."""
parent_id = kwargs.get(self._parent_id_name)
- notifier_api.notify(request.context,
- self._publisher_id,
+ self._notifier.info(request.context,
self._resource + '.create.start',
- notifier_api.CONF.default_notification_level,
body)
body = Controller.prepare_request_body(request.context, body, True,
self._resource, self._attr_info,
def notify(create_result):
notifier_method = self._resource + '.create.end'
- notifier_api.notify(request.context,
- self._publisher_id,
+ self._notifier.info(request.context,
notifier_method,
- notifier_api.CONF.default_notification_level,
create_result)
self._send_dhcp_notification(request.context,
create_result,
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
- notifier_api.notify(request.context,
- self._publisher_id,
+ self._notifier.info(request.context,
self._resource + '.delete.start',
- notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
action = self._plugin_handlers[self.DELETE]
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
notifier_method = self._resource + '.delete.end'
- notifier_api.notify(request.context,
- self._publisher_id,
+ self._notifier.info(request.context,
notifier_method,
- notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
result = {self._resource: self._view(request.context, obj)}
self._send_nova_notification(action, {}, result)
msg = _("Invalid format: %s") % request.body
raise exceptions.BadRequest(resource='body', msg=msg)
payload['id'] = id
- notifier_api.notify(request.context,
- self._publisher_id,
+ self._notifier.info(request.context,
self._resource + '.update.start',
- notifier_api.CONF.default_notification_level,
payload)
body = Controller.prepare_request_body(request.context, body, False,
self._resource, self._attr_info,
obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + '.update.end'
- notifier_api.notify(request.context,
- self._publisher_id,
- notifier_method,
- notifier_api.CONF.default_notification_level,
- result)
+ self._notifier.info(request.context, notifier_method, result)
self._send_dhcp_notification(request.context,
result,
notifier_method)
from oslo.config import cfg
from neutron.common import config
+from neutron.common import rpc as n_rpc
from neutron import context
from neutron import manager
-from neutron.openstack.common.notifier import api as notifier_api
def main():
cxt = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
+ notifier = n_rpc.get_notifier('network')
for network in plugin.get_networks(cxt):
- notifier_api.notify(cxt,
- notifier_api.publisher_id('network'),
- 'network.exists',
- notifier_api.INFO,
- {'network': network})
+ notifier.info(cxt, 'network.exists', {'network': network})
for subnet in plugin.get_subnets(cxt):
- notifier_api.notify(cxt,
- notifier_api.publisher_id('network'),
- 'subnet.exists',
- notifier_api.INFO,
- {'subnet': subnet})
+ notifier.info(cxt, 'subnet.exists', {'subnet': subnet})
for port in plugin.get_ports(cxt):
- notifier_api.notify(cxt,
- notifier_api.publisher_id('network'),
- 'port.exists',
- notifier_api.INFO,
- {'port': port})
+ notifier.info(cxt, 'port.exists', {'port': port})
for router in plugin.get_routers(cxt):
- notifier_api.notify(cxt,
- notifier_api.publisher_id('network'),
- 'router.exists',
- notifier_api.INFO,
- {'router': router})
+ notifier.info(cxt, 'router.exists', {'router': router})
for floatingip in plugin.get_floatingips(cxt):
- notifier_api.notify(cxt,
- notifier_api.publisher_id('network'),
- 'floatingip.exists',
- notifier_api.INFO,
- {'floatingip': floatingip})
+ notifier.info(cxt, 'floatingip.exists', {'floatingip': floatingip})
import os
from oslo.config import cfg
+from oslo import messaging
from paste import deploy
from neutron.api.v2 import attributes
from neutron.common import utils
from neutron.openstack.common.db import options as db_options
from neutron.openstack.common import log as logging
-from neutron.openstack.common import rpc
from neutron import version
cfg.CONF.register_cli_opts(core_cli_opts)
# Ensure that the control exchange is set correctly
-rpc.set_defaults(control_exchange='neutron')
+messaging.set_transport_defaults(control_exchange='neutron')
_SQL_CONNECTION_DEFAULT = 'sqlite://'
# Update the default QueuePool parameters. These can be tweaked by the
# configuration variables - max_pool_size, max_overflow and pool_timeout
version='%%prog %s' % version.version_info.release_string(),
**kwargs)
+ # FIXME(ihrachys): if import is put in global, circular import
+ # failure occurs
+ from neutron.common import rpc as n_rpc
+ n_rpc.init(cfg.CONF)
+
# Validate that the base_mac is of the correct format
msg = attributes._validate_regex(cfg.CONF.base_mac,
attributes.MAC_PATTERN)
# License for the specific language governing permissions and limitations
# under the License.
+from oslo.config import cfg
+from oslo import messaging
+from oslo.messaging import serializer as om_serializer
+
+from neutron.common import exceptions
from neutron import context
from neutron.openstack.common import log as logging
-from neutron.openstack.common.rpc import dispatcher
LOG = logging.getLogger(__name__)
-class PluginRpcDispatcher(dispatcher.RpcDispatcher):
- """This class is used to convert RPC common context into
+TRANSPORT = None
+NOTIFIER = None
+
+ALLOWED_EXMODS = [
+ exceptions.__name__,
+]
+EXTRA_EXMODS = []
+
+
+TRANSPORT_ALIASES = {
+ 'neutron.openstack.common.rpc.impl_fake': 'fake',
+ 'neutron.openstack.common.rpc.impl_qpid': 'qpid',
+ 'neutron.openstack.common.rpc.impl_kombu': 'rabbit',
+ 'neutron.openstack.common.rpc.impl_zmq': 'zmq',
+ 'neutron.rpc.impl_fake': 'fake',
+ 'neutron.rpc.impl_qpid': 'qpid',
+ 'neutron.rpc.impl_kombu': 'rabbit',
+ 'neutron.rpc.impl_zmq': 'zmq',
+}
+
+
+def init(conf):
+ global TRANSPORT, NOTIFIER
+ exmods = get_allowed_exmods()
+ TRANSPORT = messaging.get_transport(conf,
+ allowed_remote_exmods=exmods,
+ aliases=TRANSPORT_ALIASES)
+ NOTIFIER = messaging.Notifier(TRANSPORT)
+
+
+def cleanup():
+ global TRANSPORT, NOTIFIER
+ assert TRANSPORT is not None
+ assert NOTIFIER is not None
+ TRANSPORT.cleanup()
+ TRANSPORT = NOTIFIER = None
+
+
+def add_extra_exmods(*args):
+ EXTRA_EXMODS.extend(args)
+
+
+def clear_extra_exmods():
+ del EXTRA_EXMODS[:]
+
+
+def get_allowed_exmods():
+ return ALLOWED_EXMODS + EXTRA_EXMODS
+
+
+def get_client(target, version_cap=None, serializer=None):
+ assert TRANSPORT is not None
+ serializer = PluginRpcSerializer(serializer)
+ return messaging.RPCClient(TRANSPORT,
+ target,
+ version_cap=version_cap,
+ serializer=serializer)
+
+
+def get_server(target, endpoints, serializer=None):
+ assert TRANSPORT is not None
+ serializer = PluginRpcSerializer(serializer)
+ return messaging.get_rpc_server(TRANSPORT,
+ target,
+ endpoints,
+ executor='eventlet',
+ serializer=serializer)
+
+
+def get_notifier(service=None, host=None, publisher_id=None):
+ assert NOTIFIER is not None
+ if not publisher_id:
+ publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
+ return NOTIFIER.prepare(publisher_id=publisher_id)
+
+
+class PluginRpcSerializer(om_serializer.Serializer):
+ """This serializer is used to convert RPC common context into
Neutron Context.
"""
+ def __init__(self, base):
+ super(PluginRpcSerializer, self).__init__()
+ self._base = base
+
+ def serialize_entity(self, ctxt, entity):
+ if not self._base:
+ return entity
+ return self._base.serialize_entity(ctxt, entity)
+
+ def deserialize_entity(self, ctxt, entity):
+ if not self._base:
+ return entity
+ return self._base.deserialize_entity(ctxt, entity)
- def __init__(self, callbacks):
- super(PluginRpcDispatcher, self).__init__(callbacks)
+ def serialize_context(self, ctxt):
+ return ctxt.to_dict()
- def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs):
- rpc_ctxt_dict = rpc_ctxt.to_dict()
+ def deserialize_context(self, ctxt):
+ rpc_ctxt_dict = ctxt.copy()
user_id = rpc_ctxt_dict.pop('user_id', None)
if not user_id:
user_id = rpc_ctxt_dict.pop('user', None)
tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
if not tenant_id:
tenant_id = rpc_ctxt_dict.pop('project_id', None)
- neutron_ctxt = context.Context(user_id, tenant_id,
- load_admin_roles=False, **rpc_ctxt_dict)
- return super(PluginRpcDispatcher, self).dispatch(
- neutron_ctxt, version, method, namespace, **kwargs)
+ return context.Context(user_id, tenant_id,
+ load_admin_roles=False, **rpc_ctxt_dict)
# License for the specific language governing permissions and limitations
# under the License.
+from oslo.config import cfg
+from oslo import messaging
+
+from neutron.common import rpc as n_rpc
from neutron.openstack.common import log as logging
-from neutron.openstack.common import rpc
-from neutron.openstack.common.rpc import common as rpc_common
-from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
-from neutron.openstack.common.rpc import proxy
from neutron.openstack.common import service
LOG = logging.getLogger(__name__)
-class RpcProxy(proxy.RpcProxy):
+class RpcProxy(object):
'''
This class is created to facilitate migration from oslo-incubator
RPC layer implementation to oslo.messaging and is intended to
emulate RpcProxy class behaviour using oslo.messaging API once the
migration is applied.
'''
+ RPC_API_NAMESPACE = None
+
+ def __init__(self, topic, default_version, version_cap=None):
+ self.topic = topic
+ target = messaging.Target(topic=topic, version=default_version)
+ self._client = n_rpc.get_client(target, version_cap=version_cap)
+
+ def make_msg(self, method, **kwargs):
+ return {'method': method,
+ 'namespace': self.RPC_API_NAMESPACE,
+ 'args': kwargs}
+
+ def call(self, context, msg, **kwargs):
+ return self.__call_rpc_method(
+ context, msg, rpc_method='call', **kwargs)
+
+ def cast(self, context, msg, **kwargs):
+ self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
+
+ def fanout_cast(self, context, msg, **kwargs):
+ kwargs['fanout'] = True
+ self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs)
+
+ def __call_rpc_method(self, context, msg, **kwargs):
+ options = dict(
+ ((opt, kwargs[opt])
+ for opt in ('fanout', 'timeout', 'topic', 'version')
+ if kwargs.get(opt))
+ )
+ if msg['namespace']:
+ options['namespace'] = msg['namespace']
+
+ if options:
+ callee = self._client.prepare(**options)
+ else:
+ callee = self._client
+
+ func = getattr(callee, kwargs['rpc_method'])
+ return func(context, msg['method'], **msg['args'])
class RpcCallback(object):
callback version using oslo.messaging API once the migration is
applied.
'''
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ super(RpcCallback, self).__init__()
+ self.target = messaging.Target(version=self.RPC_API_VERSION)
class Service(service.Service):
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
- dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
- self.serializer)
+ dispatcher = [self.manager]
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
super(Service, self).stop()
+class Connection(object):
+
+ def __init__(self):
+ super(Connection, self).__init__()
+ self.servers = []
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ target = messaging.Target(
+ topic=topic, server=cfg.CONF.host, fanout=fanout)
+ server = n_rpc.get_server(target, proxy)
+ self.servers.append(server)
+
+ def consume_in_thread(self):
+ for server in self.servers:
+ server.start()
+ return self.servers
+
+
# functions
-create_connection = rpc.create_connection
+def create_connection(new=True):
+ return Connection()
# exceptions
-RPCException = rpc_common.RPCException
-RemoteError = rpc_common.RemoteError
-MessagingTimeout = rpc_common.Timeout
+RPCException = messaging.MessagingException
+RemoteError = messaging.RemoteError
+MessagingTimeout = messaging.MessagingTimeout
from neutron.api.v2 import attributes
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
+from neutron.common import rpc as n_rpc
from neutron.common import utils
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import l3
from neutron import manager
from neutron.openstack.common import log as logging
-from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']}
- notifier_api.notify(context,
- notifier_api.publisher_id('network'),
- 'router.interface.create',
- notifier_api.CONF.default_notification_level,
- {'router_interface': info})
+ notifier = n_rpc.get_notifier('network')
+ notifier.info(
+ context, 'router.interface.create', {'router_interface': info})
return info
def _confirm_router_interface_not_in_use(self, context, router_id,
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': subnet['id']}
- notifier_api.notify(context,
- notifier_api.publisher_id('network'),
- 'router.interface.delete',
- notifier_api.CONF.default_notification_level,
- {'router_interface': info})
+ notifier = n_rpc.get_notifier('network')
+ notifier.info(
+ context, 'router.interface.delete', {'router_interface': info})
return info
def _get_floatingip(self, context, id):
# under the License.
from neutron.common import constants as consts
-from neutron.common import rpc as p_rpc
from neutron.common import utils
from neutron import manager
from neutron.openstack.common import log as logging
self.meter_plugin = meter_plugin
def create_rpc_dispatcher(self):
- return p_rpc.PluginRpcDispatcher([self])
+ return [self]
def get_sync_data_metering(self, context, **kwargs):
l3_plugin = manager.NeutronManager.get_service_plugins().get(
from neutron.openstack.common import threadgroup
-rpc = importutils.try_import('neutron.openstack.common.rpc')
+#rpc = importutils.try_import('neutron.openstack.common.rpc')
+# TODO(ihrachys): restore once oslo-rpc code is removed from the tree
+rpc = None
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import excutils
from neutron.openstack.common import log
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.bigswitch import config as pl_config
LOG = log.getLogger(__name__)
self.topic = topics.AGENT
self.plugin_rpc = PluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
- self.dispatcher = dispatcher.RpcDispatcher([self])
+ self.dispatcher = [self]
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.common import constants as const
from neutron.common import exceptions
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
RPC_API_VERSION = '1.1'
def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
def get_port_from_device(self, device):
port_id = re.sub(r"^tap", "", device)
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.hyperv.agent import utils
from neutron.plugins.hyperv.agent import utilsfactory
consumers)
def _create_rpc_dispatcher(self):
- rpc_callback = HyperVSecurityCallbackMixin(self)
- return dispatcher.RpcDispatcher([rpc_callback])
+ return [HyperVSecurityCallbackMixin(self)]
class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
segmentation_id, port['admin_state_up'])
def _create_rpc_dispatcher(self):
- return dispatcher.RpcDispatcher([self])
+ return [self]
def _get_vswitch_name(self, network_type, physical_network):
if network_type != p_const.TYPE_LOCAL:
# @author: Alessandro Pilotti, Cloudbase Solutions Srl
from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import dhcp_rpc_base
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.ibm.common import config # noqa
from neutron.plugins.ibm.common import constants
"out-of-band")
def create_rpc_dispatcher(self):
- return dispatcher.RpcDispatcher([self])
+ return [self]
def setup_integration_br(self, bridge_name, reset_br, out_of_band,
controller_ip=None):
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return n_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
def sdnve_info(self, rpc_context, **kwargs):
'''Update new information.'''
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.common import config # noqa
from neutron.plugins.linuxbridge.common import constants as lconst
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return dispatcher.RpcDispatcher([self])
+ return [self]
class LinuxBridgePluginApi(agent_rpc.PluginApi,
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
- return n_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
class MidonetPluginException(n_exc.NeutronException):
# License for the specific language governing permissions and limitations
# under the License.
+from oslo import messaging
+
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
+ # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due to
+ # inheritance problems
+ target = messaging.Target(version=RPC_API_VERSION)
+
def __init__(self, notifier, type_manager):
# REVISIT(kmestery): This depends on the first three super classes
# not having their own __init__ functions. If an __init__() is added
# to one, this could break. Fix this and add a unit test to cover this
# test in H3.
- # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due
- # to inheritance problems
super(RpcCallbacks, self).__init__(notifier, type_manager)
def create_rpc_dispatcher(self):
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def _device_to_port_id(cls, device):
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.mlnx.agent import utils
from neutron.plugins.mlnx.common import config # noqa
or support more than one class as the target of rpc messages,
override this method.
"""
- return dispatcher.RpcDispatcher([self])
+ return [self]
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
from oslo.config import cfg
from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import api as db_api
or support more than one class as the target of RPC messages,
override this method.
"""
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.nec.common import config
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
- self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec,
- self.callback_sg])
+ self.dispatcher = [self.callback_nec, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
from oslo.config import cfg
from neutron.agent.common import config
-from neutron.openstack.common import rpc # noqa
from neutron.plugins.nec.common import constants as nconst
from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback()
- callbacks = [NECPluginV2RPCCallbacks(self.safe_reference),
- DhcpRpcCallback(),
- L3RpcCallback(),
- self.callback_sg,
- agents_db.AgentExtRpcCallback()]
- self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
+ self.dispatcher = [
+ NECPluginV2RPCCallbacks(self.safe_reference),
+ DhcpRpcCallback(),
+ L3RpcCallback(),
+ self.callback_sg,
+ agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return q_rpc.PluginRpcDispatcher([self])
+ return [self]
def update_ports(self, rpc_context, **kwargs):
"""Update ports' information and activate/deavtivate them.
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.ofagent.common import config # noqa
from neutron.plugins.openvswitch.common import constants
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
- return dispatcher.RpcDispatcher([self])
+ return [self]
def _provision_local_vlan_outbound_for_tunnel(self, lvid,
segmentation_id, ofports):
from neutron import context as n_context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.oneconvergence.lib import config
LOG = logging.getLogger(__name__)
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
- self.dispatcher = dispatcher.RpcDispatcher([self.callback_oc,
- self.callback_sg])
+ self.dispatcher = [self.callback_oc, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import exceptions as nexception
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
def create_rpc_dispatcher(self):
"""Get the rpc dispatcher for this manager."""
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
@staticmethod
def get_port_from_device(device):
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.common import config # noqa
from neutron.plugins.openvswitch.common import constants
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return dispatcher.RpcDispatcher([self])
+ return [self]
def provision_local_vlan(self, net_uuid, network_type, physical_network,
segmentation_id):
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return q_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
@classmethod
def get_port_from_device(cls, device):
from neutron import context as q_context
from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log
-from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.ryu.common import config # noqa
consumers)
def _create_rpc_dispatcher(self):
- return dispatcher.RpcDispatcher([self])
+ return [self]
def _setup_integration_br(self, root_helper, integ_br,
tunnel_ip, ovsdb_port, ovsdb_ip):
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import api as db
self.ofp_rest_api_addr = ofp_rest_api_addr
def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher([self])
+ return [self]
def get_ofp_rest_api(self, context, **kwargs):
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as ntn_exc
-from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return n_rpc.PluginRpcDispatcher([self,
- agents_db.AgentExtRpcCallback()])
+ return [self, agents_db.AgentExtRpcCallback()]
def handle_network_dhcp_access(plugin, context, network, action):
from neutron.api.v2 import attributes
from neutron.common import exceptions
import neutron.common.utils as utils
-from neutron import manager
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
# resource is handled by the core plugin. It might be worth
# having a way to map resources to plugins so to make this
# check more general
+ # FIXME(ihrachys): if import is put in global, circular
+ # import failure occurs
+ from neutron import manager
f = getattr(manager.NeutronManager.get_instance().plugin,
'get_%s' % parent_res)
# f *must* exist, if not found it is better to let neutron
# License for the specific language governing permissions and limitations
# under the License.
-import eventlet
import inspect
import logging as std_logging
import os
import random
from oslo.config import cfg
+from oslo.messaging import server as rpc_server
from neutron.common import config
from neutron.common import rpc_compat
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
- self._server = None
+ self._servers = []
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing errors later when they are
# discovered to be broken.
session.get_engine().pool.dispose()
- self._server = self._plugin.start_rpc_listener()
+ self._servers = self._plugin.start_rpc_listener()
def wait(self):
- if isinstance(self._server, eventlet.greenthread.GreenThread):
- self._server.wait()
+ for server in self._servers:
+ if isinstance(server, rpc_server.MessageHandlingServer):
+ server.wait()
def stop(self):
- if isinstance(self._server, eventlet.greenthread.GreenThread):
- self._server.kill()
- self._server = None
+ for server in self._servers:
+ if isinstance(server, rpc_server.MessageHandlingServer):
+ server.kill()
+ self._servers = []
def serve_rpc():
from oslo.config import cfg
from neutron.common import exceptions as n_exception
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron import context as neutron_context
self.plugin = plugin
def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher([self])
+ return [self]
def set_firewall_status(self, context, firewall_id, status, **kwargs):
"""Agent uses this to set a firewall's status."""
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import api as qdbapi
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
"""
- return q_rpc.PluginRpcDispatcher([self])
+ return [self]
class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.db import agents_db
self.plugin = plugin
def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher(
- [self, agents_db.AgentExtRpcCallback(self.plugin)])
+ return [self, agents_db.AgentExtRpcCallback(self.plugin)]
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as constants
+from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common import periodic_task
from neutron.openstack.common import service
from neutron import service as neutron_service
'host': self.host}
LOG.debug(_("Send metering report: %s"), data)
- notifier_api.notify(self.context,
- notifier_api.publisher_id('metering'),
- 'l3.meter',
- notifier_api.CONF.default_notification_level,
- data)
+ notifier = n_rpc.get_notifier('metering')
+ notifier.info(self.context, 'l3.meter', data)
info['pkts'] = 0
info['bytes'] = 0
info['time'] = 0
import netaddr
from oslo.config import cfg
+from oslo import messaging
import six
from neutron.common import exceptions
-from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron import context as ctx
from neutron.openstack.common import lockutils
# history
# 1.0 Initial version
-
RPC_API_VERSION = '1.0'
+ # TODO(ihrachys): we can't use RpcCallback here due to inheritance
+ # issues
+ target = messaging.Target(version=RPC_API_VERSION)
+
def __init__(self, agent, host):
- # TODO(ihrachys): we can't use RpcCallback here due to
- # inheritance issues
self.host = host
self.conn = rpc_compat.create_connection(new=True)
context = ctx.get_admin_context_without_session()
for k, v in csrs_found.items()])
def create_rpc_dispatcher(self):
- return n_rpc.PluginRpcDispatcher([self])
+ return [self]
def vpnservice_updated(self, context, **kwargs):
"""Handle VPNaaS service driver change notifications."""
import jinja2
import netaddr
from oslo.config import cfg
+from oslo import messaging
import six
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
-from neutron.common import rpc as q_rpc
from neutron.common import rpc_compat
from neutron import context
from neutron.openstack.common import lockutils
RPC_API_VERSION = '1.0'
+ # TODO(ihrachys): we can't use RpcCallback here due to inheritance
+ # issues
+ target = messaging.Target(version=RPC_API_VERSION)
+
def __init__(self, agent, host):
- # TODO(ihrachys): we can't use RpcCallback here due to
- # inheritance issues
self.agent = agent
self.conf = self.agent.conf
self.root_helper = self.agent.root_helper
interval=self.conf.ipsec.ipsec_status_check_interval)
def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher([self])
+ return [self]
def _update_nat(self, vpnservice, func):
"""Setting up nat rule in iptables.
from netaddr import core as net_exc
from neutron.common import exceptions
-from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
self.driver = driver
def create_rpc_dispatcher(self):
- return n_rpc.PluginRpcDispatcher([self])
+ return [self]
def get_vpn_services_on_host(self, context, host=None):
"""Retuns info on the vpnservices on the host."""
# under the License.
import netaddr
-from neutron.common import rpc as n_rpc
from neutron.common import rpc_compat
from neutron.openstack.common import log as logging
from neutron.services.vpn.common import topics
self.driver = driver
def create_rpc_dispatcher(self):
- return n_rpc.PluginRpcDispatcher([self])
+ return [self]
def get_vpn_services_on_host(self, context, host=None):
"""Returns the vpnservices on the host."""
import fixtures
import mock
from oslo.config import cfg
+from oslo.messaging import conffixture as messaging_conffixture
import testtools
from neutron.common import config
+from neutron.common import rpc as n_rpc
from neutron.db import agentschedulers_db
from neutron import manager
-from neutron.openstack.common.notifier import api as notifier_api
-from neutron.openstack.common.notifier import test_notifier
-from neutron.openstack.common import rpc
-from neutron.openstack.common.rpc import impl_fake
+from neutron.tests import fake_notifier
from neutron.tests import post_mortem_debug
return True
+def fake_consume_in_threads(self):
+ return []
+
+
class BaseTestCase(testtools.TestCase):
def cleanup_core_plugin(self):
if core_plugin is not None:
cfg.CONF.set_override('core_plugin', core_plugin)
- def _cleanup_test_notifier(self):
- test_notifier.NOTIFICATIONS = []
-
def setup_notification_driver(self, notification_driver=None):
- # to reload the drivers
- self.addCleanup(notifier_api._reset_drivers)
- self.addCleanup(self._cleanup_test_notifier)
- notifier_api._reset_drivers()
+ self.addCleanup(fake_notifier.reset)
if notification_driver is None:
- notification_driver = [test_notifier.__name__]
+ notification_driver = [fake_notifier.__name__]
cfg.CONF.set_override("notification_driver", notification_driver)
@staticmethod
else:
conf(args)
- def _cleanup_rpc_backend(self):
- rpc._RPCIMPL = None
- impl_fake.CONSUMERS.clear()
-
def setUp(self):
super(BaseTestCase, self).setUp()
# test-specific cleanup has a chance to release references.
self.addCleanup(self.cleanup_core_plugin)
- self.addCleanup(self._cleanup_rpc_backend)
-
# Configure this first to ensure pm debugging support for setUp()
if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING:
self.addOnException(post_mortem_debug.exception_handler)
'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
fake_use_fatal_exceptions))
+ # don't actually start RPC listeners when testing
+ self.useFixture(fixtures.MonkeyPatch(
+ 'neutron.common.rpc_compat.Connection.consume_in_thread',
+ fake_consume_in_threads))
+
+ self.useFixture(fixtures.MonkeyPatch(
+ 'oslo.messaging.Notifier', fake_notifier.FakeNotifier))
+
+ self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
+ self.messaging_conf.transport_driver = 'fake'
+ self.messaging_conf.response_timeout = 15
+ self.useFixture(self.messaging_conf)
+
+ self.addCleanup(n_rpc.clear_extra_exmods)
+ n_rpc.add_extra_exmods('neutron.test')
+
+ self.addCleanup(n_rpc.cleanup)
+ n_rpc.init(CONF)
+
if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml':
raise self.skipException('XML Testing Skipped in Py26')
--- /dev/null
+# Copyright 2014 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import functools
+
+
+NOTIFICATIONS = []
+
+
+def reset():
+ del NOTIFICATIONS[:]
+
+
+FakeMessage = collections.namedtuple('Message',
+ ['publisher_id', 'priority',
+ 'event_type', 'payload'])
+
+
+class FakeNotifier(object):
+
+ def __init__(self, transport, publisher_id=None):
+ self.transport = transport
+ self.publisher_id = publisher_id
+ for priority in ('debug', 'info', 'warn', 'error', 'critical'):
+ setattr(self, priority,
+ functools.partial(self._notify, priority=priority.upper()))
+
+ def prepare(self, publisher_id=None):
+ if publisher_id is None:
+ publisher_id = self.publisher_id
+ return self.__class__(self.transport, publisher_id)
+
+ def _notify(self, ctxt, event_type, payload, priority):
+ msg = dict(publisher_id=self.publisher_id,
+ priority=priority,
+ event_type=event_type,
+ payload=payload)
+ NOTIFICATIONS.append(msg)
import mock
from neutron.agent import rpc as agent_rpc
+from neutron.common import rpc_compat
from neutron.common import topics
from neutron.openstack.common import context
-from neutron.openstack.common import rpc
from neutron.plugins.hyperv import agent_notifier_api as ana
from neutron.plugins.hyperv.common import constants
from neutron.tests import base
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
- expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
- with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
+ proxy = rpc_compat.RpcProxy
+ with mock.patch.object(proxy, rpc_method) as rpc_method_mock:
rpc_method_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg]
- for arg, expected_arg in zip(rpc_method_mock.call_args[0],
- expected_args):
- self.assertEqual(arg, expected_arg)
+ expected = [
+ mock.call(ctxt, expected_msg, topic=topic)
+ ]
+ rpc_method_mock.assert_has_calls(expected)
def test_delete_network(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
expected_retval = 'foo' if method == 'call' else None
if not expected_msg:
expected_msg = rpcapi.make_msg(method, **kwargs)
- expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
- 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
+ 'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
+ _fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval)
- expected_args = [ctxt, topic, expected_msg]
+ expected_args = [ctxt, expected_msg]
+ expected_kwargs = {'topic': topic}
- for arg, expected_arg in zip(self.fake_args, expected_args):
+ # skip the first argument which is 'self'
+ for arg, expected_arg in zip(self.fake_args[1:], expected_args):
self.assertEqual(expected_arg, arg)
+ self.assertEqual(expected_kwargs, self.fake_kwargs)
def test_delete_network(self):
rpcapi = plb.AgentNotifierApi(topics.AGENT)
import mock
from neutron.agent import rpc as agent_rpc
+from neutron.common import rpc_compat
from neutron.common import topics
from neutron.openstack.common import context
-from neutron.openstack.common import rpc
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.plugins.ml2 import rpc as plugin_rpc
from neutron.tests import base
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
- expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
+ rpc = rpc_compat.RpcProxy
with mock.patch.object(rpc, rpc_method) as rpc_method_mock:
rpc_method_mock.return_value = expected_retval
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
-
- expected_args = [ctxt, topic, expected_msg]
- for arg, expected_arg in zip(rpc_method_mock.call_args[0],
- expected_args):
- self.assertEqual(arg, expected_arg)
+ expected = [
+ mock.call(ctxt, expected_msg, topic=topic)
+ ]
+ rpc_method_mock.assert_has_calls(expected)
def test_delete_network(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
expected_retval = 'foo' if method == 'call' else None
if not expected_msg:
expected_msg = rpcapi.make_msg(method, **kwargs)
- expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
- 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
+ 'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
+ _fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval)
- expected_args = [ctxt, topic, expected_msg]
+ expected_args = [ctxt, expected_msg]
+ expected_kwargs = {'topic': topic}
- for arg, expected_arg in zip(self.fake_args, expected_args):
+ # skip the first argument which is 'self'
+ for arg, expected_arg in zip(self.fake_args[1:], expected_args):
self.assertEqual(expected_arg, arg)
+ self.assertEqual(expected_kwargs, self.fake_kwargs)
def test_delete_network(self):
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
- expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
return expected_retval
self.useFixture(fixtures.MonkeyPatch(
- 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method))
+ 'neutron.common.rpc_compat.RpcProxy.' + rpc_method,
+ _fake_rpc_method))
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
- expected_args = [ctxt, topic, expected_msg]
+ expected_args = [ctxt, expected_msg]
+ expected_kwargs = {'topic': topic}
- for arg, expected_arg in zip(self.fake_args, expected_args):
+ # skip the first argument which is 'self'
+ for arg, expected_arg in zip(self.fake_args[1:], expected_args):
self.assertEqual(arg, expected_arg)
+ self.assertEqual(expected_kwargs, self.fake_kwargs)
def test_delete_network(self):
rpcapi = povs.AgentNotifierApi(topics.AGENT)
from oslo.config import cfg
from neutron.agent.common import config
-from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import uuidutils
from neutron.services.metering.agents import metering_agent
from neutron.tests import base
+from neutron.tests import fake_notifier
_uuid = uuidutils.generate_uuid
'bytes': 444}}
self.agent._metering_loop()
- self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0)
- for n in test_notifier.NOTIFICATIONS:
+ self.assertNotEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ for n in fake_notifier.NOTIFICATIONS:
if n['event_type'] == 'l3.meter':
break
agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val = 'foo'
- with mock.patch('neutron.openstack.common.rpc.call') as rpc_call:
+ with mock.patch('neutron.common.rpc_compat.RpcProxy.call') as rpc_call:
rpc_call.return_value = expect_val
func_obj = getattr(agent, method)
if method == 'tunnel_sync':
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
-from neutron.openstack.common.notifier import api as notifer_api
from neutron.openstack.common import policy as common_policy
from neutron.openstack.common import uuidutils
from neutron import policy
from neutron import quota
from neutron.tests import base
+from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_api
class NotificationTest(APIv2TestBase):
- def _resource_op_notifier(self, opname, resource, expected_errors=False,
- notification_level='INFO'):
+
+ def setUp(self):
+ super(NotificationTest, self).setUp()
+ fake_notifier.reset()
+
+ def _resource_op_notifier(self, opname, resource, expected_errors=False):
initial_input = {resource: {'name': 'myname'}}
instance = self.plugin.return_value
instance.get_networks.return_value = initial_input
instance.get_networks_count.return_value = 0
expected_code = exc.HTTPCreated.code
- with mock.patch.object(notifer_api, 'notify') as mynotifier:
- if opname == 'create':
- initial_input[resource]['tenant_id'] = _uuid()
- res = self.api.post_json(
- _get_path('networks'),
- initial_input, expect_errors=expected_errors)
- if opname == 'update':
- res = self.api.put_json(
- _get_path('networks', id=_uuid()),
- initial_input, expect_errors=expected_errors)
- expected_code = exc.HTTPOk.code
- if opname == 'delete':
- initial_input[resource]['tenant_id'] = _uuid()
- res = self.api.delete(
- _get_path('networks', id=_uuid()),
- expect_errors=expected_errors)
- expected_code = exc.HTTPNoContent.code
- expected = [mock.call(mock.ANY,
- 'network.' + cfg.CONF.host,
- resource + "." + opname + ".start",
- notification_level,
- mock.ANY),
- mock.call(mock.ANY,
- 'network.' + cfg.CONF.host,
- resource + "." + opname + ".end",
- notification_level,
- mock.ANY)]
- self.assertEqual(expected, mynotifier.call_args_list)
+ if opname == 'create':
+ initial_input[resource]['tenant_id'] = _uuid()
+ res = self.api.post_json(
+ _get_path('networks'),
+ initial_input, expect_errors=expected_errors)
+ if opname == 'update':
+ res = self.api.put_json(
+ _get_path('networks', id=_uuid()),
+ initial_input, expect_errors=expected_errors)
+ expected_code = exc.HTTPOk.code
+ if opname == 'delete':
+ initial_input[resource]['tenant_id'] = _uuid()
+ res = self.api.delete(
+ _get_path('networks', id=_uuid()),
+ expect_errors=expected_errors)
+ expected_code = exc.HTTPNoContent.code
+
+ expected_events = ('.'.join([resource, opname, "start"]),
+ '.'.join([resource, opname, "end"]))
+ self.assertEqual(len(fake_notifier.NOTIFICATIONS),
+ len(expected_events))
+ for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events):
+ self.assertEqual('INFO', msg['priority'])
+ self.assertEqual(event, msg['event_type'])
+
self.assertEqual(res.status_int, expected_code)
def test_network_create_notifer(self):
def test_network_update_notifer(self):
self._resource_op_notifier('update', 'network')
- def test_network_create_notifer_with_log_level(self):
- cfg.CONF.set_override('default_notification_level', 'DEBUG')
- self._resource_op_notifier('create', 'network',
- notification_level='DEBUG')
-
class DHCPNotificationTest(APIv2TestBase):
def _test_dhcp_notifier(self, opname, resource, initial_input=None):
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
-from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants
+from neutron.tests import fake_notifier
from neutron.tests.unit import test_agent_ext_plugin
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_api_v2_extension
'subnet.create.end',
'router.interface.create',
'router.interface.delete']
- test_notifier.NOTIFICATIONS = []
+ fake_notifier.reset()
with self.router() as r:
with self.subnet() as s:
body = self._router_interface_action('add',
self.assertEqual(
set(exp_notifications),
- set(n['event_type'] for n in test_notifier.NOTIFICATIONS))
+ set(n['event_type'] for n in fake_notifier.NOTIFICATIONS))
- for n in test_notifier.NOTIFICATIONS:
+ for n in fake_notifier.NOTIFICATIONS:
if n['event_type'].startswith('router.interface.'):
payload = n['payload']['router_interface']
self.assertIn('id', payload)
six>=1.7.0
stevedore>=0.14
oslo.config>=1.2.1
+oslo.messaging>=1.3.0
oslo.rootwrap
python-novaclient>=2.17.0
fslsdn = neutron.plugins.ml2.drivers.mechanism_fslsdn:FslsdnMechanismDriver
neutron.openstack.common.cache.backends =
memory = neutron.openstack.common.cache._backends.memory:MemoryBackend
+# These are for backwards compat with Icehouse notification_driver configuration values
+oslo.messaging.notify.drivers =
+ neutron.openstack.common.notifier.log_notifier = oslo.messaging.notify._impl_log:LogDriver
+ neutron.openstack.common.notifier.no_op_notifier = oslo.messaging.notify._impl_noop:NoOpDriver
+ neutron.openstack.common.notifier.rpc_notifier2 = oslo.messaging.notify._impl_messaging:MessagingV2Driver
+ neutron.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver
+ neutron.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver
+
[build_sphinx]
all_files = 1