LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
- dispatcher = [self.manager]
+ endpoints = [self.manager]
# Share this same connection for these Consumers
- self.conn.create_consumer(self.topic, dispatcher, fanout=False)
+ self.conn.create_consumer(self.topic, endpoints, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
- self.conn.create_consumer(node_topic, dispatcher, fanout=False)
+ self.conn.create_consumer(node_topic, endpoints, fanout=False)
- self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+ self.conn.create_consumer(self.topic, endpoints, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
def __init__(self, meter_plugin):
self.meter_plugin = meter_plugin
- def create_rpc_dispatcher(self):
- return [self]
-
def get_sync_data_metering(self, context, **kwargs):
l3_plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
self.topic = topics.AGENT
self.plugin_rpc = PluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
- self.dispatcher = [self]
+ self.endpoints = [self]
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
RPC_API_VERSION = '1.1'
- def create_rpc_dispatcher(self):
- return [self, agents_db.AgentExtRpcCallback()]
-
def get_port_from_device(self, device):
port_id = re.sub(r"^tap", "", device)
port = self.get_port_and_sgs(port_id)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
self._dhcp_agent_notifier
)
- self.callbacks = RestProxyCallbacks()
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
+ self.endpoints = [RestProxyCallbacks(),
+ agents_db.AgentExtRpcCallback()]
+ self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
# 1.1 Support Security Group RPC
TAP_PREFIX_LEN = 3
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- """
- return [self, agents_db.AgentExtRpcCallback()]
-
@classmethod
def get_port_from_device(cls, device):
"""Get port from the brocade specific db."""
self.rpc_context = context.RequestContext('neutron', 'neutron',
is_admin=False)
self.conn = rpc_compat.create_connection(new=True)
- self.callbacks = BridgeRpcCallbacks()
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [BridgeRpcCallbacks(),
+ agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.notifier = AgentNotifierApi(topics.AGENT)
# Set RPC API version to 1.1 by default.
RPC_API_VERSION = '1.1'
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this rpc manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- """
- return [self, agents_db.AgentExtRpcCallback()]
-
class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
- self.dispatcher = N1kvRpcCallbacks().create_rpc_dispatcher()
+ self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
# Consume from all consumers in threads
def _setup_rpc(self):
self.topic = topics.AGENT
- self.dispatcher = self._create_rpc_dispatcher()
+ self.endpoints = [HyperVSecurityCallbackMixin(self)]
consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
- def _create_rpc_dispatcher(self):
- return [HyperVSecurityCallbackMixin(self)]
-
class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback,
sg_rpc.SecurityGroupAgentRpcCallbackMixin):
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
- self.dispatcher = self._create_rpc_dispatcher()
+ self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
network_type, physical_network,
segmentation_id, port['admin_state_up'])
- def _create_rpc_dispatcher(self):
- return [self]
-
def _get_vswitch_name(self, network_type, physical_network):
if network_type != p_const.TYPE_LOCAL:
vswitch_name = self._get_vswitch_for_physical_network(
self.conn = rpc_compat.create_connection(new=True)
self.notifier = agent_notifier_api.AgentNotifierApi(
topics.AGENT)
- self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
+ agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
from neutron.common import constants as q_const
from neutron.common import rpc_compat
-from neutron.db import agents_db
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
from neutron.openstack.common import log as logging
self.notifier = notifier
self._db = hyperv_db.HyperVPluginDB()
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self, agents_db.AgentExtRpcCallback()]
-
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
self.context = context.get_admin_context_without_session()
- self.dispatcher = self.create_rpc_dispatcher()
+ self.endpoints = [self]
consumers = [[constants.INFO, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
if self.polling_interval:
"connection-mode",
"out-of-band")
- def create_rpc_dispatcher(self):
- return [self]
-
def setup_integration_br(self, bridge_name, reset_br, out_of_band,
controller_ip=None):
'''Sets up the integration bridge.
def __init__(self, notifier):
self.notifier = notifier # used to notify the agent
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self, agents_db.AgentExtRpcCallback()]
-
def sdnve_info(self, rpc_context, **kwargs):
'''Update new information.'''
info = kwargs.get('info')
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
- self.callbacks = SdnveRpcCallbacks(self.notifier)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
+ self.endpoints = [SdnveRpcCallbacks(self.notifier),
+ agents_db.AgentExtRpcCallback()]
+ self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
getattr(self, method)(context, values)
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self]
-
class LinuxBridgePluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
- self.callbacks = LinuxBridgeRpcCallbacks(self.context,
- self)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self)]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
# Device names start with "tap"
TAP_PREFIX_LEN = 3
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self, agents_db.AgentExtRpcCallback()]
-
@classmethod
def get_port_from_device(cls, device):
port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
- self.callbacks = LinuxBridgeRpcCallbacks()
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [LinuxBridgeRpcCallbacks(),
+ agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.notifier = AgentNotifierApi(topics.AGENT)
dhcp_rpc_base.DhcpRpcCallbackMixin):
RPC_API_VERSION = '1.1'
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this manager.
-
- This a basic implementation that will call the plugin like get_ports
- and handle basic events
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- """
- return [self, agents_db.AgentExtRpcCallback()]
-
class MidonetPluginException(n_exc.NeutronException):
message = _("%(msg)s")
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
- self.callbacks = MidoRpcCallbacks()
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
+ self.endpoints = [MidoRpcCallbacks(),
+ agents_db.AgentExtRpcCallback()]
+ self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
from neutron.common import exceptions as exc
from neutron.common import rpc_compat
from neutron.common import topics
+from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
)
def start_rpc_listeners(self):
- self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
+ self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
+ agents_db.AgentExtRpcCallback()]
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
+ self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()
from neutron.common import constants as q_const
from neutron.common import rpc_compat
from neutron.common import topics
-from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
# test in H3.
super(RpcCallbacks, self).__init__(notifier, type_manager)
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self, agents_db.AgentExtRpcCallback()]
-
@classmethod
def _device_to_port_id(cls, device):
# REVISIT(rkukura): Consider calling into MechanismDrivers to
else:
LOG.debug(_("No port %s defined on agent."), port['id'])
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version,
- or support more than one class as the target of rpc messages,
- override this method.
- """
- return [self]
-
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
- self.callbacks = MlnxEswitchRpcCallbacks(self.context,
- self)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [MlnxEswitchRpcCallbacks(self.context, self)]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils
+from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
- self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
+ agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
from neutron.common import constants as q_const
from neutron.common import rpc_compat
-from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
#to be compatible with Linux Bridge Agent on Network Node
TAP_PREFIX_LEN = 3
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this manager.
-
- If a manager would like to set an RPC API version,
- or support more than one class as the target of RPC messages,
- override this method.
- """
- return [self, agents_db.AgentExtRpcCallback()]
-
@classmethod
def get_port_from_device(cls, device):
"""Get port according to device.
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
- self.dispatcher = [self.callback_nec, self.callback_sg]
+ self.endpoints = [self.callback_nec, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback()
- self.dispatcher = [
+ self.endpoints = [
NECPluginV2RPCCallbacks(self.safe_reference),
DhcpRpcCallback(),
L3RpcCallback(),
self.callback_sg,
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
super(NECPluginV2RPCCallbacks, self).__init__()
self.plugin = plugin
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self]
-
def update_ports(self, rpc_context, **kwargs):
"""Update ports' information and activate/deavtivate them.
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
- self.dispatcher = self.create_rpc_dispatcher()
+ self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
return
self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- """
- return [self]
-
def _provision_local_vlan_outbound_for_tunnel(self, lvid,
segmentation_id, ofports):
br = self.tun_br
self, self.sg_agent)
self.callback_sg = SecurityGroupAgentRpcCallback(self.context,
self.sg_agent)
- self.dispatcher = [self.callback_oc, self.callback_sg]
+ self.endpoints = [self.callback_oc, self.callback_sg]
# Define the listening consumer for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
RPC_API_VERSION = '1.1'
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this manager."""
- return [self, agents_db.AgentExtRpcCallback()]
-
@staticmethod
def get_port_from_device(device):
port = nvsd_db.get_port_from_device(device)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI()
)
- self.callbacks = NVSDPluginRpcCallbacks()
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [NVSDPluginRpcCallbacks(),
+ agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
- self.dispatcher = self.create_rpc_dispatcher()
+ self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
if self.l2_pop:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
else:
LOG.warning(_('Action %s not supported'), action)
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self]
-
def provision_local_vlan(self, net_uuid, network_type, physical_network,
segmentation_id):
'''Provisions a local VLAN.
self.notifier = notifier
self.tunnel_type = tunnel_type
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self, agents_db.AgentExtRpcCallback()]
-
@classmethod
def get_port_from_device(cls, device):
port = ovs_db_v2.get_port_from_device(device)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI()
)
- self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
+ agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.topic = topics.AGENT
self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
- self.dispatcher = self._create_rpc_dispatcher()
+ self.endpoints = [self]
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
- self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
- def _create_rpc_dispatcher(self):
- return [self]
-
def _setup_integration_br(self, root_helper, integ_br,
tunnel_ip, ovsdb_port, ovsdb_ip):
self.int_br = OVSBridge(integ_br, root_helper)
super(RyuRpcCallbacks, self).__init__()
self.ofp_rest_api_addr = ofp_rest_api_addr
- def create_rpc_dispatcher(self):
- return [self]
-
def get_ofp_rest_api(self, context, **kwargs):
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
return self.ofp_rest_api_addr
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc_compat.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
- self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)]
for svc_topic in self.service_topics.values():
- self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
+ self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
def _create_all_tenant_network(self):
from neutron.common import constants as const
from neutron.common import exceptions as ntn_exc
from neutron.common import rpc_compat
-from neutron.db import agents_db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
from neutron.db import l3_db
RPC_API_VERSION = '1.1'
- def create_rpc_dispatcher(self):
- '''Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- '''
- return [self, agents_db.AgentExtRpcCallback()]
-
def handle_network_dhcp_access(plugin, context, network, action):
pass
from neutron.common import constants as const
from neutron.common import rpc_compat
from neutron.common import topics
+from neutron.db import agents_db
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.plugins.vmware.common import config
def _setup_rpc_dhcp_metadata(self, notifier=None):
self.topic = topics.PLUGIN
self.conn = rpc_compat.create_connection(new=True)
- self.dispatcher = nsx_rpc.NSXRpcCallbacks().create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
+ self.endpoints = [nsx_rpc.NSXRpcCallbacks(),
+ agents_db.AgentExtRpcCallback()]
+ self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
self.conn.consume_in_threads()
super(FirewallCallbacks, self).__init__()
self.plugin = plugin
- def create_rpc_dispatcher(self):
- return [self]
-
def set_firewall_status(self, context, firewall_id, status, **kwargs):
"""Agent uses this to set a firewall's status."""
LOG.debug(_("set_firewall_status() called"))
"""Do the initialization for the firewall service plugin here."""
qdbapi.register_models()
- self.callbacks = FirewallCallbacks(self)
+ self.endpoints = [FirewallCallbacks(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
- topics.FIREWALL_PLUGIN,
- self.callbacks.create_rpc_dispatcher(),
- fanout=False)
+ topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = FirewallAgentApi(
RPC_API_VERSION = '1.1'
- def create_rpc_dispatcher(self):
- """Get the rpc dispatcher for this manager.
-
- If a manager would like to set an rpc API version, or support more than
- one class as the target of rpc messages, override this method.
- """
- return [self]
-
class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
self.conn = rpc_compat.create_connection(new=True)
self.agent_notifiers.update(
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
- self.callbacks = L3RouterPluginRpcCallbacks()
- self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
+ self.endpoints = [L3RouterPluginRpcCallbacks()]
+ self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
self.conn.consume_in_threads()
super(LoadBalancerCallbacks, self).__init__()
self.plugin = plugin
- def create_rpc_dispatcher(self):
- return [self, agents_db.AgentExtRpcCallback(self.plugin)]
-
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
agents = self.plugin.get_lbaas_agents(context,
if hasattr(self.plugin, 'agent_callbacks'):
return
- self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
+ self.plugin.agent_endpoints = [
+ LoadBalancerCallbacks(self.plugin),
+ agents_db.AgentExtRpcCallback(self.plugin)
+ ]
self.plugin.conn = rpc_compat.create_connection(new=True)
self.plugin.conn.create_consumer(
topics.LOADBALANCER_PLUGIN,
- self.plugin.agent_callbacks.create_rpc_dispatcher(),
+ self.plugin.agent_endpoints,
fanout=False)
self.plugin.conn.consume_in_threads()
def __init__(self):
super(MeteringPlugin, self).__init__()
- self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
+ self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
- topics.METERING_PLUGIN,
- self.callbacks.create_rpc_dispatcher(),
- fanout=False)
+ topics.METERING_PLUGIN, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
self.service_state = {}
- self.conn.create_consumer(
- node_topic,
- self.create_rpc_dispatcher(),
- fanout=False)
+ self.endpoints = [self]
+ self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = (
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
v['timeout']))
for k, v in csrs_found.items()])
- def create_rpc_dispatcher(self):
- return [self]
-
def vpnservice_updated(self, context, **kwargs):
"""Handle VPNaaS service driver change notifications."""
LOG.debug(_("Handling VPN service update notification '%s'"),
self.processes = {}
self.process_status_cache = {}
- self.conn.create_consumer(
- node_topic,
- self.create_rpc_dispatcher(),
- fanout=False)
+ self.endpoints = [self]
+ self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
self.process_status_cache_check.start(
interval=self.conf.ipsec.ipsec_status_check_interval)
- def create_rpc_dispatcher(self):
- return [self]
-
def _update_nat(self, vpnservice, func):
"""Setting up nat rule in iptables.
super(CiscoCsrIPsecVpnDriverCallBack, self).__init__()
self.driver = driver
- def create_rpc_dispatcher(self):
- return [self]
-
def get_vpn_services_on_host(self, context, host=None):
"""Retuns info on the vpnservices on the host."""
plugin = self.driver.service_plugin
def __init__(self, service_plugin):
super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin)
- self.callbacks = CiscoCsrIPsecVpnDriverCallBack(self)
+ self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
- topics.CISCO_IPSEC_DRIVER_TOPIC,
- self.callbacks.create_rpc_dispatcher(),
- fanout=False)
+ topics.CISCO_IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
super(IPsecVpnDriverCallBack, self).__init__()
self.driver = driver
- def create_rpc_dispatcher(self):
- return [self]
-
def get_vpn_services_on_host(self, context, host=None):
"""Returns the vpnservices on the host."""
plugin = self.driver.service_plugin
def __init__(self, service_plugin):
super(IPsecVPNDriver, self).__init__(service_plugin)
- self.callbacks = IPsecVpnDriverCallBack(self)
+ self.endpoints = [IPsecVpnDriverCallBack(self)]
self.conn = rpc_compat.create_connection(new=True)
self.conn.create_consumer(
- topics.IPSEC_DRIVER_TOPIC,
- self.callbacks.create_rpc_dispatcher(),
- fanout=False)
+ topics.IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = IPsecVpnAgentApi(
topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin'
NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi'
-CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks'
-DISPATCHER = CALLBACKS + '.create_rpc_dispatcher'
CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert'
SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager'
HTTPCON = 'neutron.plugins.bigswitch.servermanager.httplib.HTTPConnection'
def setup_patches(self):
self.plugin_notifier_p = mock.patch(NOTIFIER)
- # prevent rpc callback dispatcher from being created
- self.callbacks_p = mock.patch(DISPATCHER,
- new=lambda *args, **kwargs: None)
# prevent any greenthreads from spawning
self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
# prevent the consistency watchdog from starting
self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None)
self.addCleanup(db.clear_db)
- self.callbacks_p.start()
self.plugin_notifier_p.start()
self.spawn_p.start()
self.watch_p.start()
super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
plugin = manager.NeutronManager.get_plugin()
self.notifier = plugin.notifier
- self.rpc = plugin.callbacks
+ self.rpc = plugin.endpoints[0]
self.startHttpPatch()
bound, status)
port_id = port['port']['id']
neutron_context = context.get_admin_context()
- details = self.plugin.callbacks.get_device_details(
+ details = self.plugin.endpoints[0].get_device_details(
neutron_context, agent_id="theAgentId", device=port_id)
if bound:
self.assertEqual(details['network_type'], 'local')
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device(port_id)
+ callbacks = plugin.endpoints[0]
+ port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+ port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device(port_id)
+ callbacks = plugin.endpoints[0]
+ port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+ port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device(port_id)
+ callbacks = plugin.endpoints[0]
+ port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+ port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device(port_id)
+ port_dict = plugin.endpoints[0].get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin()
- port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
+ port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id')
self.assertIsNone(port_dict)
def setUp(self):
super(TestFirewallCallbacks,
self).setUp(fw_plugin=FW_PLUGIN_KLASS)
- self.callbacks = self.plugin.callbacks
+ self.callbacks = self.plugin.endpoints[0]
def test_set_firewall_status(self):
ctx = context.get_admin_context()
def setUp(self):
super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
- self.callbacks = self.plugin.callbacks
+ self.callbacks = self.plugin.endpoints[0]
def test_create_second_firewall_not_permitted(self):
with self.firewall():
for k, v in attrs.iteritems():
self.assertEqual(fw_db[k], v)
# cleanup the pending firewall
- self.plugin.callbacks.firewall_deleted(ctx, fw_id)
+ self.plugin.endpoints[0].firewall_deleted(ctx, fw_id)
def test_delete_firewall_after_agent_delete(self):
ctx = context.get_admin_context()
class AgentRPCMethods(base.BaseTestCase):
def test_create_consumers(self):
- dispatcher = mock.Mock()
+ endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
- mock.call().create_consumer('foo-topic-op', dispatcher,
+ mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
- rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
+ rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
def test_create_consumers_with_node_name(self):
- dispatcher = mock.Mock()
+ endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
- mock.call().create_consumer('foo-topic-op', dispatcher,
+ mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
- mock.call().create_consumer('foo-topic-op.node1', dispatcher,
+ mock.call().create_consumer('foo-topic-op.node1', endpoints,
fanout=False),
mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
- rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')])
+ rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)