import random
+from oslo import messaging
+
from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
LOG = logging.getLogger(__name__)
-class L3AgentNotifyAPI(n_rpc.RpcProxy):
+class L3AgentNotifyAPI(object):
"""API for plugin to notify L3 agent."""
- BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.L3_AGENT):
- super(L3AgentNotifyAPI, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ target = messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
def _notification_host(self, context, method, payload, host):
"""Notify the agent that is hosting the router."""
LOG.debug('Nofity agent at %(host)s the message '
'%(method)s', {'host': host,
'method': method})
- self.cast(
- context, self.make_msg(method,
- payload=payload),
- topic='%s.%s' % (topics.L3_AGENT, host))
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, method, payload=payload)
def _agent_notification(self, context, method, router_ids, operation,
shuffle_agents):
{'topic': l3_agent.topic,
'host': l3_agent.host,
'method': method})
- self.cast(
- context, self.make_msg(method,
- routers=[router_id]),
- topic='%s.%s' % (l3_agent.topic, l3_agent.host),
- version='1.1')
+ cctxt = self.client.prepare(topic=l3_agent.topic,
+ server=l3_agent.host,
+ version='1.1')
+ cctxt.cast(context, method, routers=[router_id])
def _agent_notification_arp(self, context, method, router_id,
operation, data):
# TODO(murali): replace cast with fanout to avoid performance
# issues at greater scale.
for l3_agent in l3_agents:
- topic = '%s.%s' % (l3_agent.topic, l3_agent.host)
+ log_topic = '%s.%s' % (l3_agent.topic, l3_agent.host)
LOG.debug('Casting message %(method)s with topic %(topic)s',
- {'topic': topic, 'method': method})
+ {'topic': log_topic, 'method': method})
dvr_arptable = {'router_id': router_id,
'arp_table': data}
- self.cast(context,
- self.make_msg(method, payload=dvr_arptable),
- topic=topic, version='1.2')
+ cctxt = self.client.prepare(topic=l3_agent.topic,
+ server=l3_agent.host,
+ version='1.2')
+ cctxt.cast(context, method, payload=dvr_arptable)
def _notification(self, context, method, router_ids, operation,
shuffle_agents):
self._agent_notification(
context, method, router_ids, operation, shuffle_agents)
else:
- self.fanout_cast(
- context, self.make_msg(method,
- routers=router_ids),
- topic=topics.L3_AGENT)
+ cctxt = self.client.prepare(fanout=True)
+ cctxt.cast(context, method, routers=router_ids)
def _notification_fanout(self, context, method, router_id):
"""Fanout the deleted router to all L3 agents."""
{'topic': topics.L3_AGENT,
'method': method,
'router_id': router_id})
- self.fanout_cast(
- context, self.make_msg(method,
- router_id=router_id),
- topic=topics.L3_AGENT)
+ cctxt = self.client.prepare(fanout=True)
+ cctxt.cast(context, method, router_id=router_id)
def agent_updated(self, context, admin_state_up, host):
self._notification_host(context, 'agent_updated',
l3_plugin = (manager.NeutronManager.get_service_plugins()
[service_constants.L3_ROUTER_NAT])
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
- with mock.patch.object(l3_notifier, 'cast') as mock_l3:
- with self.router() as router1:
- self._register_agent_states()
- hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
- L3_HOSTA)
- self._add_router_to_l3_agent(hosta_id,
- router1['router']['id'])
- routers = [router1['router']['id']]
- mock_l3.assert_called_with(
- mock.ANY,
- l3_notifier.make_msg(
- 'router_added_to_agent',
- payload=routers),
- topic='l3_agent.hosta')
+ with contextlib.nested(
+ mock.patch.object(l3_notifier.client, 'prepare',
+ return_value=l3_notifier.client),
+ mock.patch.object(l3_notifier.client, 'cast'),
+ self.router(),
+ ) as (
+ mock_prepare, mock_cast, router1
+ ):
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._add_router_to_l3_agent(hosta_id,
+ router1['router']['id'])
+ routers = [router1['router']['id']]
+ mock_prepare.assert_called_with(server='hosta')
+ mock_cast.assert_called_with(
+ mock.ANY, 'router_added_to_agent', payload=routers)
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'l3_agent.router.add'
self._assert_notify(notifications, expected_event_type)
l3_plugin = (manager.NeutronManager.get_service_plugins()
[service_constants.L3_ROUTER_NAT])
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
- with mock.patch.object(l3_notifier, 'cast') as mock_l3:
- with self.router() as router1:
- self._register_agent_states()
- hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
- L3_HOSTA)
- self._add_router_to_l3_agent(hosta_id,
- router1['router']['id'])
- self._remove_router_from_l3_agent(hosta_id,
- router1['router']['id'])
- mock_l3.assert_called_with(
- mock.ANY, l3_notifier.make_msg(
- 'router_removed_from_agent',
- payload={'router_id': router1['router']['id']}),
- topic='l3_agent.hosta')
+ with contextlib.nested(
+ mock.patch.object(l3_notifier.client, 'prepare',
+ return_value=l3_notifier.client),
+ mock.patch.object(l3_notifier.client, 'cast'),
+ self.router(),
+ ) as (
+ mock_prepare, mock_cast, router1
+ ):
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._add_router_to_l3_agent(hosta_id,
+ router1['router']['id'])
+ self._remove_router_from_l3_agent(hosta_id,
+ router1['router']['id'])
+ mock_prepare.assert_called_with(server='hosta')
+ mock_cast.assert_called_with(
+ mock.ANY, 'router_removed_from_agent',
+ payload={'router_id': router1['router']['id']})
notifications = fake_notifier.NOTIFICATIONS
expected_event_type = 'l3_agent.router.remove'
self._assert_notify(notifications, expected_event_type)
l3_plugin = (manager.NeutronManager.get_service_plugins()
[service_constants.L3_ROUTER_NAT])
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
- with mock.patch.object(l3_notifier, 'cast') as mock_l3:
+ with contextlib.nested(
+ mock.patch.object(l3_notifier.client, 'prepare',
+ return_value=l3_notifier.client),
+ mock.patch.object(l3_notifier.client, 'cast'),
+ ) as (
+ mock_prepare, mock_cast
+ ):
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._disable_agent(hosta_id, admin_state_up=False)
- mock_l3.assert_called_with(
- mock.ANY, l3_notifier.make_msg(
- 'agent_updated', payload={'admin_state_up': False}),
- topic='l3_agent.hosta')
+
+ mock_prepare.assert_called_with(server='hosta')
+
+ mock_cast.assert_called_with(
+ mock.ANY, 'agent_updated', payload={'admin_state_up': False})