'status': vpnservice['status']}
return self._fields(res, fields)
+ def _check_router(self, context, router_id):
+ l3_plugin = manager.NeutronManager.get_service_plugins().get(
+ constants.L3_ROUTER_NAT)
+ l3_plugin.get_router(context, router_id)
+
+ def _check_subnet_id(self, context, router_id, subnet_id):
+ core_plugin = manager.NeutronManager.get_plugin()
+ ports = core_plugin.get_ports(
+ context,
+ filters={
+ 'fixed_ips': {'subnet_id': [subnet_id]},
+ 'device_id': [router_id]})
+ if not ports:
+ raise vpnaas.SubnetIsNotConnectedToRouter(
+ subnet_id=subnet_id,
+ router_id=router_id)
+
def create_vpnservice(self, context, vpnservice):
vpns = vpnservice['vpnservice']
tenant_id = self._get_tenant_id_for_create(context, vpns)
+ self._check_router(context, vpns['router_id'])
+ self._check_subnet_id(context, vpns['router_id'], vpns['subnet_id'])
with context.session.begin(subtransactions=True):
vpnservice_db = VPNService(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
admin_state_up,
router_id, subnet_id,
expected_res_status=None, **kwargs):
+ tenant_id = kwargs.get('tenant_id', self._tenant_id)
data = {'vpnservice': {'name': name,
'subnet_id': subnet_id,
'router_id': router_id,
'admin_state_up': admin_state_up,
- 'tenant_id': self._tenant_id}}
+ 'tenant_id': tenant_id}}
for arg in ['description']:
if arg in kwargs and kwargs[arg] is not None:
data['vpnservice'][arg] = kwargs[arg]
vpnservice_req = self.new_create_request('vpnservices', data, fmt)
+ if (kwargs.get('set_context') and
+ 'tenant_id' in kwargs):
+ # create a specific auth context for this request
+ vpnservice_req.environ['neutron.context'] = context.Context(
+ '', kwargs['tenant_id'])
vpnservice_res = vpnservice_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(vpnservice_res.status_int, expected_res_status)
subnet=None,
router=None,
admin_state_up=True,
- no_delete=False, **kwargs):
+ no_delete=False,
+ plug_subnet=True, **kwargs):
if not fmt:
fmt = self.fmt
with test_db_plugin.optional_ctx(subnet, self.subnet) as tmp_subnet:
with test_db_plugin.optional_ctx(router,
self.router) as tmp_router:
+ if plug_subnet:
+ self._router_interface_action(
+ 'add',
+ tmp_router['router']['id'],
+ tmp_subnet['subnet']['id'], None)
+
res = self._create_vpnservice(fmt,
name,
admin_state_up,
if not no_delete:
self._delete('vpnservices',
vpnservice['vpnservice']['id'])
+ if plug_subnet:
+ self._router_interface_action(
+ 'remove',
+ tmp_router['router']['id'],
+ tmp_subnet['subnet']['id'], None)
def _create_ipsec_site_connection(self, fmt, name='test',
peer_address='192.168.1.10',
vpnservice['vpnservice'].items()
if k in expected),
expected)
- return vpnservice
+
+ def test_create_vpnservice_with_invalid_router(self):
+ """Test case to create a vpnservice with invalid router"""
+ with self.network(
+ set_context=True,
+ tenant_id='tenant_a') as network:
+ with self.subnet(network=network,
+ cidr='10.2.0.0/24') as subnet:
+ with self.router(
+ set_context=True, tenant_id='tenant_a') as router:
+ router_id = router['router']['id']
+ subnet_id = subnet['subnet']['id']
+ self._create_vpnservice(
+ self.fmt, 'fake',
+ True, router_id, subnet_id,
+ expected_res_status=webob.exc.HTTPNotFound.code,
+ set_context=True, tenant_id='tenant_b')
+
+ def test_create_vpnservice_with_nonconnected_subnet(self):
+ """Test case to create a vpnservice with nonconnected subnet."""
+ with self.network() as network:
+ with self.subnet(network=network,
+ cidr='10.2.0.0/24') as subnet:
+ with self.router() as router:
+ router_id = router['router']['id']
+ subnet_id = subnet['subnet']['id']
+ self._create_vpnservice(
+ self.fmt, 'fake',
+ True, router_id, subnet_id,
+ expected_res_status=webob.exc.HTTPBadRequest.code)
def _set_active(self, model, resource_id):
service_plugin = manager.NeutronManager.get_service_plugins()[
router=router),
self.vpnservice(name='vpnservice2',
subnet=subnet,
- router=router),
+ router=router,
+ plug_subnet=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
- router=router)
+ router=router,
+ plug_subnet=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_sort('vpnservice', (vpnservice3,
vpnservice2,
router=router),
self.vpnservice(name='vpnservice2',
subnet=subnet,
- router=router),
+ router=router,
+ plug_subnet=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
- router=router)
+ router=router,
+ plug_subnet=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_pagination('vpnservice',
(vpnservice1,
router=router),
self.vpnservice(name='vpnservice2',
subnet=subnet,
- router=router),
+ router=router,
+ plug_subnet=False),
self.vpnservice(name='vpnservice3',
subnet=subnet,
- router=router)
+ router=router,
+ plug_subnet=False)
) as(vpnservice1, vpnservice2, vpnservice3):
self._test_list_with_pagination_reverse('vpnservice',
(vpnservice1,