# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
- def test_plugin(self):
- self._make_network('json',
- 'some_net',
- True,
- tenant_id=self.tenant_id,
- set_context=True)
-
- req = self.new_list_request('networks', params="fields=tenant_id")
- req.environ['neutron.context'] = context.Context('', self.tenant_id)
- res = req.get_response(self.api)
- self.assertEqual(res.status_int, 200)
- body = self.deserialize('json', res)
- self.assertIn('tenant_id', body['networks'][0])
-
class TestN1kvNetworkProfiles(N1kvPluginTestCase):
def _prepare_net_profile_data(self,
class TestN1kvNetworks(test_plugin.TestNetworksV2,
N1kvPluginTestCase):
+ def test_plugin(self):
+ self._make_network('json',
+ 'some_net',
+ True,
+ tenant_id=self.tenant_id,
+ set_context=True)
+
+ req = self.new_list_request('networks', params="fields=tenant_id")
+ req.environ['neutron.context'] = context.Context('', self.tenant_id)
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int, 200)
+ body = self.deserialize('json', res)
+ self.assertIn('tenant_id', body['networks'][0])
+
def _prepare_net_data(self, net_profile_id):
return {'network': {'name': 'net1',
n1kv.PROFILE_ID: net_profile_id,
super(OpenDaylightTestCase, self).setUp(PLUGIN_NAME)
self.port_create_status = 'DOWN'
- self.segment = {'api.NETWORK_TYPE': ""}
self.mech = mechanism_odl.OpenDaylightMechanismDriver()
mechanism_odl.OpenDaylightMechanismDriver.sendjson = (
self.check_sendjson)
def check_sendjson(self, method, urlpath, obj, ignorecodes=[]):
self.assertFalse(urlpath.startswith("http://"))
- def test_check_segment(self):
- """Validate the check_segment call."""
- self.segment[api.NETWORK_TYPE] = constants.TYPE_LOCAL
- self.assertTrue(self.mech.check_segment(self.segment))
- self.segment[api.NETWORK_TYPE] = constants.TYPE_FLAT
- self.assertFalse(self.mech.check_segment(self.segment))
- self.segment[api.NETWORK_TYPE] = constants.TYPE_VLAN
- self.assertTrue(self.mech.check_segment(self.segment))
- self.segment[api.NETWORK_TYPE] = constants.TYPE_GRE
- self.assertTrue(self.mech.check_segment(self.segment))
- self.segment[api.NETWORK_TYPE] = constants.TYPE_VXLAN
- self.assertTrue(self.mech.check_segment(self.segment))
- # Validate a network type not currently supported
- self.segment[api.NETWORK_TYPE] = 'mpls'
- self.assertFalse(self.mech.check_segment(self.segment))
-
class OpenDayLightMechanismConfigTests(testlib_api.SqlTestCase):
requests.codes.not_implemented):
self._test_delete_resource_postcommit(
'port', status_code, requests.exceptions.HTTPError)
+
+ def test_check_segment(self):
+ """Validate the check_segment call."""
+ segment = {'api.NETWORK_TYPE': ""}
+ segment[api.NETWORK_TYPE] = constants.TYPE_LOCAL
+ self.assertTrue(self.mech.check_segment(segment))
+ segment[api.NETWORK_TYPE] = constants.TYPE_FLAT
+ self.assertFalse(self.mech.check_segment(segment))
+ segment[api.NETWORK_TYPE] = constants.TYPE_VLAN
+ self.assertTrue(self.mech.check_segment(segment))
+ segment[api.NETWORK_TYPE] = constants.TYPE_GRE
+ self.assertTrue(self.mech.check_segment(segment))
+ segment[api.NETWORK_TYPE] = constants.TYPE_VXLAN
+ self.assertTrue(self.mech.check_segment(segment))
+ # Validate a network type not currently supported
+ segment[api.NETWORK_TYPE] = 'mpls'
+ self.assertFalse(self.mech.check_segment(segment))
from neutron.tests.unit import test_extension_extraroute as extraroute_test
from neutron.tests.unit import test_extension_security_group as test_sg
from neutron.tests.unit import test_extensions
-from neutron.tests.unit import test_l3_plugin
API_EXT_PATH = os.path.dirname(extensions.__file__)
self._check_response_no_portbindings(non_admin_port)
-class TestNuageL3NatTestCase(NuagePluginV2TestCase,
- test_l3_plugin.L3NatDBIntTestCase):
+class TestNuageExtrarouteTestCase(NuagePluginV2TestCase,
+ extraroute_test.ExtraRouteDBIntTestCase):
+
+ def test_router_update_with_dup_destination_address(self):
+ with self.router() as r:
+ with self.subnet(cidr='10.0.1.0/24') as s:
+ with self.port(subnet=s) as p:
+ self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
+ routes = [{'destination': '135.207.0.0/16',
+ 'nexthop': '10.0.1.3'},
+ {'destination': '135.207.0.0/16',
+ 'nexthop': '10.0.1.5'}]
+
+ self._update('routers', r['router']['id'],
+ {'router': {'routes':
+ routes}},
+ expected_code=exc.HTTPBadRequest.code)
+
+ # clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
+ def test_router_update_on_external_port(self):
+ with self.router() as r:
+ with self.subnet(cidr='10.0.1.0/24') as s:
+ self._set_net_external(s['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+ body = self._show('routers', r['router']['id'])
+ net_id = body['router']['external_gateway_info']['network_id']
+ self.assertEqual(net_id, s['subnet']['network_id'])
+ port_res = self._list_ports(
+ 'json',
+ 200,
+ s['subnet']['network_id'],
+ tenant_id=r['router']['tenant_id'],
+ device_own=constants.DEVICE_OWNER_ROUTER_GW)
+ port_list = self.deserialize('json', port_res)
+ # The plugin will create 1 port
+ self.assertEqual(2, len(port_list['ports']))
+
+ routes = [{'destination': '135.207.0.0/16',
+ 'nexthop': '10.0.1.3'}]
+
+ body = self._update('routers', r['router']['id'],
+ {'router': {'routes':
+ routes}})
+
+ body = self._show('routers', r['router']['id'])
+ self.assertEqual(routes,
+ body['router']['routes'])
+
+ self._remove_external_gateway_from_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+ body = self._show('routers', r['router']['id'])
+ gw_info = body['router']['external_gateway_info']
+ self.assertIsNone(gw_info)
+
+ def test_floatingip_create_different_fixed_ip_same_port(self):
+ self._test_floatingip_create_different_fixed_ip_same_port()
+
+ def test_floatingip_update_different_router(self):
+ self._test_floatingip_update_different_router()
+
+ def test_floatingip_update_different_fixed_ip_same_port(self):
+ self._test_floatingip_update_different_fixed_ip_same_port()
+
+ def test_network_update_external_failure(self):
+ self._test_network_update_external_failure()
def test_update_port_with_assoc_floatingip(self):
with self.subnet(cidr='200.0.0.0/24') as public_sub:
self.assertEqual(exc.HTTPCreated.code, router_res.status_int)
-class TestNuageExtrarouteTestCase(NuagePluginV2TestCase,
- extraroute_test.ExtraRouteDBIntTestCase):
-
- def test_router_update_with_dup_destination_address(self):
- with self.router() as r:
- with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s) as p:
- self._router_interface_action('add',
- r['router']['id'],
- None,
- p['port']['id'])
-
- routes = [{'destination': '135.207.0.0/16',
- 'nexthop': '10.0.1.3'},
- {'destination': '135.207.0.0/16',
- 'nexthop': '10.0.1.5'}]
-
- self._update('routers', r['router']['id'],
- {'router': {'routes':
- routes}},
- expected_code=exc.HTTPBadRequest.code)
-
- # clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'])
-
- def test_router_update_on_external_port(self):
- with self.router() as r:
- with self.subnet(cidr='10.0.1.0/24') as s:
- self._set_net_external(s['subnet']['network_id'])
- self._add_external_gateway_to_router(
- r['router']['id'],
- s['subnet']['network_id'])
- body = self._show('routers', r['router']['id'])
- net_id = body['router']['external_gateway_info']['network_id']
- self.assertEqual(net_id, s['subnet']['network_id'])
- port_res = self._list_ports(
- 'json',
- 200,
- s['subnet']['network_id'],
- tenant_id=r['router']['tenant_id'],
- device_own=constants.DEVICE_OWNER_ROUTER_GW)
- port_list = self.deserialize('json', port_res)
- # The plugin will create 1 port
- self.assertEqual(2, len(port_list['ports']))
-
- routes = [{'destination': '135.207.0.0/16',
- 'nexthop': '10.0.1.3'}]
-
- body = self._update('routers', r['router']['id'],
- {'router': {'routes':
- routes}})
-
- body = self._show('routers', r['router']['id'])
- self.assertEqual(routes,
- body['router']['routes'])
-
- self._remove_external_gateway_from_router(
- r['router']['id'],
- s['subnet']['network_id'])
- body = self._show('routers', r['router']['id'])
- gw_info = body['router']['external_gateway_info']
- self.assertIsNone(gw_info)
-
- def test_floatingip_create_different_fixed_ip_same_port(self):
- self._test_floatingip_create_different_fixed_ip_same_port()
-
- def test_floatingip_update_different_router(self):
- self._test_floatingip_update_different_router()
-
- def test_floatingip_update_different_fixed_ip_same_port(self):
- self._test_floatingip_update_different_fixed_ip_same_port()
-
- def test_network_update_external_failure(self):
- self._test_network_update_external_failure()
-
-
class TestNuageProviderNetTestCase(NuagePluginV2TestCase):
def test_create_provider_network(self):