return
try:
- router = self._get_router(context, router_id)
+ # using admin context as router may belong to admin tenant
+ router = self._get_router(context.elevated(), router_id)
except l3.RouterNotFound:
- # TODO(obondarev): bug 1507602 was filed to investigate race
- # condition here. For now we preserve original behavior and do
- # broad notification
LOG.warning(_LW("Router %s was not found. "
- "Doing broad notification."),
+ "Skipping agent notification."),
router_id)
- self.notify_router_updated(context, router_id)
return
if is_distributed_router(router):
self.assertIsNotNone(body['floatingip']['fixed_ip_address'])
self.assertIsNotNone(body['floatingip']['router_id'])
+ def test_create_floatingip_non_admin_context_agent_notification(self):
+ plugin = manager.NeutronManager.get_service_plugins()[
+ service_constants.L3_ROUTER_NAT]
+ if not hasattr(plugin, 'l3_rpc_notifier'):
+ self.skipTest("Plugin does not support l3_rpc_notifier")
+
+ with self.subnet(cidr='11.0.0.0/24') as public_sub,\
+ self.port() as private_port,\
+ self.router() as r:
+ self._set_net_external(public_sub['subnet']['network_id'])
+ subnet_id = private_port['port']['fixed_ips'][0]['subnet_id']
+ private_sub = {'subnet': {'id': subnet_id}}
+
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ public_sub['subnet']['network_id'])
+ self._router_interface_action(
+ 'add', r['router']['id'],
+ private_sub['subnet']['id'], None)
+
+ with mock.patch.object(plugin.l3_rpc_notifier,
+ 'routers_updated') as agent_notification:
+ self._make_floatingip(
+ self.fmt,
+ public_sub['subnet']['network_id'],
+ port_id=private_port['port']['id'],
+ set_context=True)
+ self.assertTrue(agent_notification.called)
+
def test_floating_port_status_not_applicable(self):
with self.floatingip_with_assoc():
port_body = self._list('ports',