def _test_notify_op_agent(self, target_func, *args):
l3_rpc_agent_api_str = (
'neutron.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI')
- plugin = manager.NeutronManager.get_service_plugins()[
- service_constants.L3_ROUTER_NAT]
- oldNotify = plugin.l3_rpc_notifier
- try:
- with mock.patch(l3_rpc_agent_api_str) as notifyApi:
- plugin.l3_rpc_notifier = notifyApi
- kargs = [item for item in args]
- kargs.append(notifyApi)
- target_func(*kargs)
- except Exception:
- plugin.l3_rpc_notifier = oldNotify
- raise
- else:
- plugin.l3_rpc_notifier = oldNotify
+ with mock.patch(l3_rpc_agent_api_str):
+ plugin = manager.NeutronManager.get_service_plugins()[
+ service_constants.L3_ROUTER_NAT]
+ notifyApi = plugin.l3_rpc_notifier
+ kargs = [item for item in args]
+ kargs.append(notifyApi)
+ target_func(*kargs)
def _test_router_gateway_op_agent(self, notifyApi):
with self.router() as r: