mock.ANY, ri.router_id,
{fip_id: l3_constants.FLOATINGIP_STATUS_ERROR})
+ def test_process_external_iptables_exception(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ with mock.patch.object(
+ agent.plugin_rpc,
+ 'update_floatingip_statuses') as mock_update_fip_status:
+ fip_id = _uuid()
+ router = prepare_router_data(num_internal_ports=1)
+ router[l3_constants.FLOATINGIP_KEY] = [
+ {'id': fip_id,
+ 'floating_ip_address': '8.8.8.8',
+ 'fixed_ip_address': '7.7.7.7',
+ 'port_id': router[l3_constants.INTERFACE_KEY][0]['id']}]
+
+ ri = l3router.RouterInfo(router['id'], router, **self.ri_kwargs)
+ ri.iptables_manager._apply = mock.Mock(side_effect=Exception)
+ agent._process_external(ri)
+ # Assess the call for putting the floating IP into Error
+ # was performed
+ mock_update_fip_status.assert_called_once_with(
+ mock.ANY, ri.router_id,
+ {fip_id: l3_constants.FLOATINGIP_STATUS_ERROR})
+
+ self.assertEqual(ri.iptables_manager._apply.call_count, 1)
+
def test_handle_router_snat_rules_distributed_without_snat_manager(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
ri = dvr_router.DvrRouter(
+ agent,
HOSTNAME,
'foo_router_id',
{'distributed': True},