self._router = value
if not self._router:
return
+ # enable_snat by default if it wasn't specified by plugin
+ self._snat_enabled = self._router.get('enable_snat', True)
# Set a SNAT action for the router
if self._router.get('gw_port'):
- self._snat_action = (
- 'add_rules' if self._router.get('enable_snat')
- else 'remove_rules')
+ self._snat_action = ('add_rules' if self._snat_enabled
+ else 'remove_rules')
elif self.ex_gw_port:
# Gateway port was removed, remove rules
self._snat_action = 'remove_rules'
- self._snat_enabled = self._router.get('enable_snat')
def ns_name(self):
if self.use_namespaces:
else:
self.assertIn(r.rule, expected_rules)
- def _prepare_router_data(self, enable_snat=True, num_internal_ports=1):
+ def _prepare_router_data(self, enable_snat=None, num_internal_ports=1):
router_id = _uuid()
ex_gw_port = {'id': _uuid(),
'network_id': _uuid(),
router = {
'id': router_id,
l3_constants.INTERFACE_KEY: int_ports,
- 'enable_snat': enable_snat,
'routes': [],
'gw_port': ex_gw_port}
+ if enable_snat is not None:
+ router['enable_snat'] = enable_snat
return router
def testProcessRouter(self):
def test_process_router_snat_disabled(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- router = self._prepare_router_data()
+ router = self._prepare_router_data(enable_snat=True)
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
self.conf.use_namespaces, router=router)
# Process with NAT
router = self._prepare_router_data(enable_snat=False)
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
self.conf.use_namespaces, router=router)
- # Process with NAT
+ # Process without NAT
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
- # Reprocess without NAT
+ # Reprocess with NAT
router['enable_snat'] = True
# Reassign the router object to RouterInfo
ri.router = router