def _lsn_port_configure_action(
cluster, lsn_id, lsn_port_id, action, is_enabled, obj):
- do_request(HTTP_PUT,
- _build_uri_path(LSERVICESNODE_RESOURCE,
- resource_id=lsn_id,
- extra_action=action),
- json.dumps({"enabled": is_enabled}),
- cluster=cluster)
- do_request(HTTP_PUT,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- resource_id=lsn_port_id,
- extra_action=action),
- json.dumps(obj),
- cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSERVICESNODE_RESOURCE,
+ resource_id=lsn_id,
+ extra_action=action),
+ json.dumps({"enabled": is_enabled}),
+ cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ resource_id=lsn_port_id,
+ extra_action=action),
+ json.dumps(obj),
+ cluster=cluster)
+
+
+ def _get_opts(name, value):
+ return {"name": name, "value": str(value)}
+def _get_opts(name, value):
+ return {"name": name, "value": str(value)}
+
+
def lsn_port_dhcp_configure(
cluster, lsn_id, lsn_port_id, is_enabled=True, dhcp_options=None):
dhcp_options = dhcp_options or {}
u'123', conn_data)
self.assertIsNotNone(connection)
self.assertFalse(connection.is_dirty)
+ self.assertEqual(1, self.conn_create.call_count)
self.assertFalse(connection.is_admin_up)
self.assertTrue(connection.forced_down)
- self.assertFalse(self.conn_create.called)
+ self.assertEqual(1, self.admin_state.call_count)
+
+ def test_update_connection_admin_up(self):
+ """Connection updated to admin up state - record."""
+ # Make existing service, and connection that was admin down
+ conn_data = copy.deepcopy(self.conn1_data)
+ conn_data.update({u'status': constants.DOWN, u'admin_state_up': False})
+ service_data = {u'id': u'123',
+ u'status': constants.DOWN,
+ u'external_ip': u'1.1.1.1',
+ u'admin_state_up': True,
+ u'ipsec_conns': [conn_data]}
+ self.driver.update_service(self.context, service_data)
+
+ # Simulate that notification of connection update received
+ self.driver.mark_existing_connections_as_dirty()
+ # Now simulate that the notification shows the connection admin up
+ new_conn_data = copy.deepcopy(conn_data)
+ new_conn_data[u'admin_state_up'] = True
+
+ connection = self.driver.update_connection(self.context,
+ u'123', new_conn_data)
+ self.assertFalse(connection.is_dirty)
+ self.assertEqual(u'Tunnel0', connection.tunnel)
+ self.assertEqual(constants.DOWN, connection.last_status)
+ self.assertTrue(connection.is_admin_up)
+ self.assertFalse(connection.forced_down)
+ self.assertEqual(2, self.admin_state.call_count)
+ def test_update_connection_admin_up(self):
+ """Connection updated to admin up state - record."""
+ # Make existing service, and connection that was admin down
+ conn_data = {u'id': '1', u'status': constants.DOWN,
+ u'admin_state_up': False,
+ u'cisco': {u'site_conn_id': u'Tunnel0'}}
+ service_data = {u'id': u'123',
+ u'status': constants.DOWN,
+ u'external_ip': u'1.1.1.1',
+ u'admin_state_up': True,
+ u'ipsec_conns': [conn_data]}
+ self.driver.update_service(self.context, service_data)
+ self.driver.mark_existing_connections_as_dirty()
+ # Now simulate that the notification shows the connection admin up
+ conn_data[u'admin_state_up'] = True
+ conn_data[u'status'] = constants.DOWN
+
+ connection = self.driver.update_connection(self.context,
+ u'123', conn_data)
+ self.assertFalse(connection.is_dirty)
+ self.assertFalse(connection.forced_down)
+ self.assertEqual(u'Tunnel0', connection.tunnel)
+ self.assertEqual(constants.DOWN, connection.last_status)
+ self.assertEqual(1, self.conn_create.call_count)
+
def test_update_for_vpn_service_create(self):
"""Creation of new IPSec connection on new VPN service - create.
if r not in orig_nat_rules]
self.assertEqual(len(nat_rules_delta), 1)
self._verify_snat_rules(nat_rules_delta, router)
- self.send_arp.assert_called_once()
+ # send_arp is called both times process_router is called
+ self.assertEqual(self.send_arp.call_count, 2)
+
+ def test_process_ipv6_only_gw(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ router = self._prepare_router_data(ip_version=6)
+ # Get NAT rules without the gw_port
+ gw_port = router['gw_port']
+ router['gw_port'] = None
+ ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+ self.conf.use_namespaces, router=router)
++ agent.external_gateway_added = mock.Mock()
++ agent.process_router(ri)
++ orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
++
++ # Get NAT rules with the gw_port
++ router['gw_port'] = gw_port
++ ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
++ self.conf.use_namespaces, router=router)
++ with mock.patch.object(
++ agent,
++ 'external_gateway_nat_rules') as external_gateway_nat_rules:
++ agent.process_router(ri)
++ new_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
++
++ # There should be no change with the NAT rules
++ self.assertFalse(external_gateway_nat_rules.called)
++ self.assertEqual(orig_nat_rules, new_nat_rules)
++
++ def test_process_router_ipv6_interface_added(self):
++ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
++ router = self._prepare_router_data()
++ ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
++ self.conf.use_namespaces, router=router)
++ agent.external_gateway_added = mock.Mock()
++ # Process with NAT
++ agent.process_router(ri)
++ orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
++ # Add an IPv6 interface and reprocess
++ router[l3_constants.INTERFACE_KEY].append(
++ {'id': _uuid(),
++ 'network_id': _uuid(),
++ 'admin_state_up': True,
++ 'fixed_ips': [{'ip_address': 'fd00::2',
++ 'subnet_id': _uuid()}],
++ 'mac_address': 'ca:fe:de:ad:be:ef',
++ 'subnet': {'cidr': 'fd00::/64',
++ 'gateway_ip': 'fd00::1'}})
++ # Reassign the router object to RouterInfo
++ ri.router = router
++ agent.process_router(ri)
++ # For some reason set logic does not work well with
++ # IpTablesRule instances
++ nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
++ if r not in orig_nat_rules]
++ self.assertFalse(nat_rules_delta)
++
++ def test_process_router_ipv6v4_interface_added(self):
++ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
++ router = self._prepare_router_data()
++ ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
++ self.conf.use_namespaces, router=router)
++ agent.external_gateway_added = mock.Mock()
++ # Process with NAT
++ agent.process_router(ri)
++ orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
++ # Add an IPv4 and IPv6 interface and reprocess
++ router[l3_constants.INTERFACE_KEY].append(
++ {'id': _uuid(),
++ 'network_id': _uuid(),
++ 'admin_state_up': True,
++ 'fixed_ips': [{'ip_address': '35.4.1.4',
++ 'subnet_id': _uuid()}],
++ 'mac_address': 'ca:fe:de:ad:be:ef',
++ 'subnet': {'cidr': '35.4.1.0/24',
++ 'gateway_ip': '35.4.1.1'}})
++
++ router[l3_constants.INTERFACE_KEY].append(
++ {'id': _uuid(),
++ 'network_id': _uuid(),
++ 'admin_state_up': True,
++ 'fixed_ips': [{'ip_address': 'fd00::2',
++ 'subnet_id': _uuid()}],
++ 'mac_address': 'ca:fe:de:ad:be:ef',
++ 'subnet': {'cidr': 'fd00::/64',
++ 'gateway_ip': 'fd00::1'}})
++ # Reassign the router object to RouterInfo
++ ri.router = router
++ agent.process_router(ri)
++ # For some reason set logic does not work well with
++ # IpTablesRule instances
++ nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
++ if r not in orig_nat_rules]
++ self.assertEqual(1, len(nat_rules_delta))
++ self._verify_snat_rules(nat_rules_delta, router)
+
+ def test_process_ipv6_only_gw(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ router = self._prepare_router_data(ip_version=6)
+ # Get NAT rules without the gw_port
+ gw_port = router['gw_port']
+ router['gw_port'] = None
+ ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+ self.conf.use_namespaces, router=router)
agent.external_gateway_added = mock.Mock()
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]