def delete_nat_rules_by_match(cluster, router_id, rule_type,
max_num_expected,
min_num_expected=0,
+ raise_on_len_mismatch=True,
**kwargs):
# remove nat rules
nat_rules = query_nat_rules(cluster, router_id)
break
else:
to_delete_ids.append(r['uuid'])
- if not (len(to_delete_ids) in
- range(min_num_expected, max_num_expected + 1)):
- raise nsx_exc.NatRuleMismatch(actual_rules=len(to_delete_ids),
- min_rules=min_num_expected,
- max_rules=max_num_expected)
+ num_rules_to_delete = len(to_delete_ids)
+ if (num_rules_to_delete < min_num_expected or
+ num_rules_to_delete > max_num_expected):
+ if raise_on_len_mismatch:
+ raise nsx_exc.NatRuleMismatch(actual_rules=num_rules_to_delete,
+ min_rules=min_num_expected,
+ max_rules=max_num_expected)
+ else:
+ LOG.warn(_("Found %(actual_rule_num)d matching NAT rules, which "
+ "is not in the expected range (%(min_exp_rule_num)d,"
+ "%(max_exp_rule_num)d)"),
+ {'actual_rule_num': num_rules_to_delete,
+ 'min_exp_rule_num': min_num_expected,
+ 'max_exp_rule_num': max_num_expected})
for rule_id in to_delete_ids:
delete_router_nat_rule(cluster, router_id, rule_id)
+ # Return number of deleted rules - useful at least for
+ # testing purposes
+ return num_rules_to_delete
def delete_router_nat_rule(cluster, router_id, rule_id):
routerlib.delete_nat_rules_by_match(
self.cluster, nsx_router_id, "SourceNatRule",
max_num_expected=1, min_num_expected=0,
+ raise_on_len_mismatch=False,
source_ip_addresses=cidr)
if add_snat_rules:
ip_addresses = self._build_ip_address_list(
routerlib.delete_nat_rules_by_match(
self.cluster, nsx_router_id, "SourceNatRule",
max_num_expected=1, min_num_expected=1,
+ raise_on_len_mismatch=False,
source_ip_addresses=subnet['cidr'])
def add_router_interface(self, context, router_id, interface_info):
routerlib.delete_nat_rules_by_match(
self.cluster, nsx_router_id, "NoSourceNatRule",
max_num_expected=1, min_num_expected=0,
+ raise_on_len_mismatch=False,
destination_ip_addresses=subnet['cidr'])
except n_exc.NotFound:
LOG.error(_("Logical router resource %s not found "
routerlib.delete_nat_rules_by_match,
self.fake_cluster, lrouter['uuid'],
'SomeWeirdType', 1, 1)
+
+ def test_delete_nat_rules_by_match_len_mismatch_does_not_raise(self):
+ lrouter = self._prepare_nat_rules_for_delete_tests()
+ rules = routerlib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 3)
+ deleted_rules = routerlib.delete_nat_rules_by_match(
+ self.fake_cluster, lrouter['uuid'],
+ 'DestinationNatRule',
+ max_num_expected=1, min_num_expected=1,
+ raise_on_len_mismatch=False,
+ destination_ip_addresses='99.99.99.99')
+ self.assertEqual(0, deleted_rules)
+ # add an extra rule to emulate a duplicate one
+ with mock.patch.object(self.fake_cluster.api_client,
+ 'get_version',
+ new=lambda: version_module.Version('2.0')):
+ routerlib.create_lrouter_snat_rule(
+ self.fake_cluster, lrouter['uuid'],
+ '10.0.0.2', '10.0.0.2', order=220,
+ match_criteria={'source_ip_addresses': '192.168.0.0/24'})
+ deleted_rules_2 = routerlib.delete_nat_rules_by_match(
+ self.fake_cluster, lrouter['uuid'], 'SourceNatRule',
+ min_num_expected=1, max_num_expected=1,
+ raise_on_len_mismatch=False,
+ source_ip_addresses='192.168.0.0/24')
+ self.assertEqual(2, deleted_rules_2)
# limitations under the License.
import contextlib
+import uuid
import mock
import netaddr
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 200)
- def test_remove_router_interface_not_in_nsx(self):
+ def _test_remove_router_interface_nsx_out_of_sync(self, unsync_action):
+ # Create external network and subnet
+ ext_net_id = self._create_network_and_subnet('1.1.1.0/24', True)[0]
# Create internal network and subnet
int_sub_id = self._create_network_and_subnet('10.0.0.0/24')[1]
res = self._create_router('json', 'tenant')
router = self.deserialize('json', res)
- # Add interface to router (needed to generate NAT rule)
+ # Set gateway and add interface to router (needed to generate NAT rule)
+ req = self.new_update_request(
+ 'routers',
+ {'router': {'external_gateway_info':
+ {'network_id': ext_net_id}}},
+ router['router']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 200)
req = self.new_action_request(
'routers',
{'subnet_id': int_sub_id},
"add_router_interface")
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 200)
- self.fc._fake_lrouter_dict.clear()
+ unsync_action()
req = self.new_action_request(
'routers',
{'subnet_id': int_sub_id},
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 200)
+ def test_remove_router_interface_not_in_nsx(self):
+
+ def unsync_action():
+ self.fc._fake_lrouter_dict.clear()
+ self.fc._fake_lrouter_nat_dict.clear()
+
+ self._test_remove_router_interface_nsx_out_of_sync(unsync_action)
+
+ def test_remove_router_interface_nat_rule_not_in_nsx(self):
+ self._test_remove_router_interface_nsx_out_of_sync(
+ self.fc._fake_lrouter_nat_dict.clear)
+
+ def test_remove_router_interface_duplicate_nat_rules_in_nsx(self):
+
+ def unsync_action():
+ # duplicate every entry in the nat rule dict
+ for (_rule_id, rule) in self.fc._fake_lrouter_nat_dict.items():
+ self.fc._fake_lrouter_nat_dict[uuid.uuid4()] = rule
+
+ self._test_remove_router_interface_nsx_out_of_sync(unsync_action)
+
def test_update_router_not_in_nsx(self):
res = self._create_router('json', 'tenant')
router = self.deserialize('json', res)