from neutron.api.v2 import resource
from neutron.common import constants
from neutron.common import exceptions
+from neutron.common import rpc as n_rpc
from neutron.extensions import agent
from neutron import manager
from neutron.openstack.common import log as logging
policy.enforce(request.context,
"create_%s" % L3_ROUTER,
{})
- return plugin.add_router_to_l3_agent(
- request.context,
- kwargs['agent_id'],
- body['router_id'])
+ agent_id = kwargs['agent_id']
+ router_id = body['router_id']
+ result = plugin.add_router_to_l3_agent(request.context, agent_id,
+ router_id)
+ notify(request.context, 'l3_agent.router.add', router_id, agent_id)
+ return result
def delete(self, request, id, **kwargs):
plugin = self.get_plugin()
policy.enforce(request.context,
"delete_%s" % L3_ROUTER,
{})
- return plugin.remove_router_from_l3_agent(
- request.context, kwargs['agent_id'], id)
+ agent_id = kwargs['agent_id']
+ result = plugin.remove_router_from_l3_agent(request.context, agent_id,
+ id)
+ notify(request.context, 'l3_agent.router.remove', id, agent_id)
+ return result
class L3AgentsHostingRouterController(wsgi.Controller):
@abc.abstractmethod
def list_l3_agents_hosting_router(self, context, router_id):
pass
+
+
+def notify(context, action, router_id, agent_id):
+ info = {'id': agent_id, 'router_id': router_id}
+ notifier = n_rpc.get_notifier('router')
+ notifier.info(context, action, {'agent': info})
from neutron.openstack.common import timeutils
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants
+from neutron.tests import fake_notifier
from neutron.tests.unit import test_agent_ext_plugin
from neutron.tests.unit import test_db_plugin as test_plugin
from neutron.tests.unit import test_extensions
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, expected_code)
+ def _assert_notify(self, notifications, expected_event_type):
+ event_types = [event['event_type'] for event in notifications]
+ self.assertIn(expected_event_type, event_types)
+
def _register_one_agent_state(self, agent_state):
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
self.l3agentscheduler_dbMinxin = (
manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT))
+ self.notify_p = mock.patch(
+ 'neutron.extensions.l3agentscheduler.notify')
+ self.patched_notify = self.notify_p.start()
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP.update(
agent.RESOURCE_ATTRIBUTE_MAP)
self.addCleanup(self.restore_attribute_map)
+ fake_notifier.reset()
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
'router_added_to_agent',
payload=routers),
topic='l3_agent.hosta')
+ notifications = fake_notifier.NOTIFICATIONS
+ expected_event_type = 'l3_agent.router.add'
+ self._assert_notify(notifications, expected_event_type)
def test_router_remove_from_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
'router_removed_from_agent',
payload={'router_id': router1['router']['id']}),
topic='l3_agent.hosta')
+ notifications = fake_notifier.NOTIFICATIONS
+ expected_event_type = 'l3_agent.router.remove'
+ self._assert_notify(notifications, expected_event_type)
def test_agent_updated_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()