res = self._create_port(self.fmt, net_id, arg_list=args,
context=ctx, **port_dict)
port = self.deserialize(self.fmt, res)
- try:
- yield res
- finally:
- if do_delete:
- self._delete('ports', port['port']['id'])
+ yield res
+ if do_delete:
+ self._delete('ports', port['port']['id'])
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
res = self._create_network(self.fmt, net_name, True,
arg_list=arg_list, **provider_attrs)
network = self.deserialize(self.fmt, res)['network']
- try:
- yield network
- finally:
- req = self.new_delete_request('networks', network['id'])
- req.get_response(self.api)
+ yield network
+ req = self.new_delete_request('networks', network['id'])
+ req.get_response(self.api)
def test_create_provider_vlan_network(self):
with self._provider_vlan_network(PHYS_NET, '1234',
request = self.new_create_request('routers', data, self.fmt)
response = request.get_response(self.ext_api)
router = self.deserialize(self.fmt, response)
- try:
- yield network, subnet, router
- finally:
- self._delete('routers', router['router']['id'])
+ yield network, subnet, router
+ self._delete('routers', router['router']['id'])
@contextlib.contextmanager
def _router_interface(self, router, subnet, **kwargs):
router['router']['id'],
'add_router_interface')
response = request.get_response(self.ext_api)
- try:
- yield response
- finally:
- # If router interface was created successfully, delete it now.
- if response.status_int == wexc.HTTPOk.code:
- request = self.new_action_request('routers', interface_data,
- router['router']['id'],
- 'remove_router_interface')
- request.get_response(self.ext_api)
+
+ yield response
+
+ # If router interface was created successfully, delete it now.
+ if response.status_int == wexc.HTTPOk.code:
+ request = self.new_action_request('routers', interface_data,
+ router['router']['id'],
+ 'remove_router_interface')
+ request.get_response(self.ext_api)
@contextlib.contextmanager
def _network_subnet_router_interface(self, **kwargs):
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall_policy = self.deserialize(fmt or self.fmt, res)
- try:
- yield firewall_policy
- finally:
- if not no_delete:
- self._delete('firewall_policies',
- firewall_policy['firewall_policy']['id'])
+ yield firewall_policy
+ if not no_delete:
+ self._delete('firewall_policies',
+ firewall_policy['firewall_policy']['id'])
def _create_firewall_rule(self, fmt, name, shared, protocol,
ip_version, source_ip_address,
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall_rule = self.deserialize(fmt or self.fmt, res)
- try:
- yield firewall_rule
- finally:
- if not no_delete:
- self._delete('firewall_rules',
- firewall_rule['firewall_rule']['id'])
+ yield firewall_rule
+ if not no_delete:
+ self._delete('firewall_rules',
+ firewall_rule['firewall_rule']['id'])
def _create_firewall(self, fmt, name, description, firewall_policy_id,
admin_state_up=True, expected_res_status=None,
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall = self.deserialize(fmt or self.fmt, res)
- try:
- yield firewall
- finally:
- if not no_delete:
- self._delete('firewalls', firewall['firewall']['id'])
+ yield firewall
+ if not no_delete:
+ self._delete('firewalls', firewall['firewall']['id'])
def _rule_action(self, action, id, firewall_rule_id, insert_before=None,
insert_after=None, expected_code=webob.exc.HTTPOk.code,
explanation=_("Unexpected error code: %s") %
res.status_int
)
- try:
- vip = self.deserialize(fmt or self.fmt, res)
- yield vip
- finally:
- if not no_delete:
- self._delete('vips', vip['vip']['id'])
+ vip = self.deserialize(fmt or self.fmt, res)
+ yield vip
+ if not no_delete:
+ self._delete('vips', vip['vip']['id'])
@contextlib.contextmanager
def pool(self, fmt=None, name='pool1', lb_method='ROUND_ROBIN',
raise webob.exc.HTTPClientError(
explanation=_("Unexpected error code: %s") % res.status_int
)
- try:
- pool = self.deserialize(fmt or self.fmt, res)
- yield pool
- finally:
- if not no_delete:
- self._delete('pools', pool['pool']['id'])
+ pool = self.deserialize(fmt or self.fmt, res)
+ yield pool
+ if not no_delete:
+ self._delete('pools', pool['pool']['id'])
@contextlib.contextmanager
def member(self, fmt=None, address='192.168.1.100', protocol_port=80,
raise webob.exc.HTTPClientError(
explanation=_("Unexpected error code: %s") % res.status_int
)
- try:
- member = self.deserialize(fmt or self.fmt, res)
- yield member
- finally:
- if not no_delete:
- self._delete('members', member['member']['id'])
+ member = self.deserialize(fmt or self.fmt, res)
+ yield member
+ if not no_delete:
+ self._delete('members', member['member']['id'])
@contextlib.contextmanager
def health_monitor(self, fmt=None, type='TCP',
else:
for arg in http_related_attributes:
self.assertIsNone(the_health_monitor.get(arg))
- try:
- yield health_monitor
- finally:
- if not no_delete:
- self._delete('health_monitors', the_health_monitor['id'])
+ yield health_monitor
+ if not no_delete:
+ self._delete('health_monitors', the_health_monitor['id'])
class LoadBalancerPluginDbTestCase(LoadBalancerTestMixin,
fmt = self.fmt
metering_label = self._make_metering_label(fmt, name,
description, **kwargs)
- try:
- yield metering_label
- finally:
- if not no_delete:
- self._delete('metering-labels',
- metering_label['metering_label']['id'])
+ yield metering_label
+ if not no_delete:
+ self._delete('metering-labels',
+ metering_label['metering_label']['id'])
@contextlib.contextmanager
def metering_label_rule(self, metering_label_id=None, direction='ingress',
direction,
remote_ip_prefix,
excluded)
- try:
- yield metering_label_rule
- finally:
- if not no_delete:
- self._delete('metering-label-rules',
- metering_label_rule['metering_label_rule']['id'])
+ yield metering_label_rule
+ if not no_delete:
+ self._delete('metering-label-rules',
+ metering_label_rule['metering_label_rule']['id'])
class MeteringPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase,
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
- try:
- ikepolicy = self.deserialize(fmt or self.fmt, res)
- yield ikepolicy
- finally:
- if not no_delete:
- self._delete('ikepolicies', ikepolicy['ikepolicy']['id'])
+ ikepolicy = self.deserialize(fmt or self.fmt, res)
+ yield ikepolicy
+ if not no_delete:
+ self._delete('ikepolicies', ikepolicy['ikepolicy']['id'])
def _create_ipsecpolicy(self, fmt,
name='ipsecpolicy1',
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
- try:
- ipsecpolicy = self.deserialize(fmt or self.fmt, res)
- yield ipsecpolicy
- finally:
- if not no_delete:
- self._delete('ipsecpolicies', ipsecpolicy['ipsecpolicy']['id'])
+ ipsecpolicy = self.deserialize(fmt or self.fmt, res)
+ yield ipsecpolicy
+ if not no_delete:
+ self._delete('ipsecpolicies', ipsecpolicy['ipsecpolicy']['id'])
def _create_vpnservice(self, fmt, name,
admin_state_up,
'add',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
- try:
- res = self._create_vpnservice(fmt,
- name,
- admin_state_up,
- router_id=(tmp_router['router']
- ['id']),
- subnet_id=(tmp_subnet['subnet']
- ['id']),
- **kwargs)
- vpnservice = self.deserialize(fmt or self.fmt, res)
- if res.status_int >= 400:
- raise webob.exc.HTTPClientError(
- code=res.status_int, detail=vpnservice)
+
+ res = self._create_vpnservice(fmt,
+ name,
+ admin_state_up,
+ router_id=(tmp_router['router']
+ ['id']),
+ subnet_id=(tmp_subnet['subnet']
+ ['id']),
+ **kwargs)
+ vpnservice = self.deserialize(fmt or self.fmt, res)
+ if res.status_int < 400:
yield vpnservice
- finally:
- if not no_delete and vpnservice.get('vpnservice'):
- self._delete('vpnservices',
- vpnservice['vpnservice']['id'])
- if plug_subnet:
- self._router_interface_action(
- 'remove',
- tmp_router['router']['id'],
- tmp_subnet['subnet']['id'], None)
- if external_router:
- external_gateway = tmp_router['router'].get(
- 'external_gateway_info')
- if external_gateway:
- network_id = external_gateway['network_id']
- self._remove_external_gateway_from_router(
- tmp_router['router']['id'], network_id)
+
+ if not no_delete and vpnservice.get('vpnservice'):
+ self._delete('vpnservices',
+ vpnservice['vpnservice']['id'])
+ if plug_subnet:
+ self._router_interface_action(
+ 'remove',
+ tmp_router['router']['id'],
+ tmp_subnet['subnet']['id'], None)
+ if external_router:
+ external_gateway = tmp_router['router'].get(
+ 'external_gateway_info')
+ if external_gateway:
+ network_id = external_gateway['network_id']
+ self._remove_external_gateway_from_router(
+ tmp_router['router']['id'], network_id)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(
+ code=res.status_int, detail=vpnservice)
def _create_ipsec_site_connection(self, fmt, name='test',
peer_address='192.168.1.10',
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
- try:
- ipsec_site_connection = self.deserialize(
- fmt or self.fmt, res
+
+ ipsec_site_connection = self.deserialize(
+ fmt or self.fmt, res
+ )
+ yield ipsec_site_connection
+
+ if not no_delete:
+ self._delete(
+ 'ipsec-site-connections',
+ ipsec_site_connection[
+ 'ipsec_site_connection']['id']
)
- yield ipsec_site_connection
- finally:
- if not no_delete:
- self._delete(
- 'ipsec-site-connections',
- ipsec_site_connection[
- 'ipsec_site_connection']['id']
- )
def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd):
self.assertEqual(
with test_plugin.optional_ctx(network, self.network) as network_to_use:
net_id = network_to_use['network']['id']
pf = self._make_packet_filter(fmt or self.fmt, net_id, **kwargs)
- try:
- yield pf
- finally:
- if do_delete:
- self._delete('packet_filters', pf['packet_filter']['id'])
+ yield pf
+ if do_delete:
+ self._delete('packet_filters', pf['packet_filter']['id'])
@contextlib.contextmanager
def packet_filter_on_port(self, port=None, fmt=None, do_delete=True,
kwargs['in_port'] = port_id
pf = self._make_packet_filter(fmt or self.fmt, net_id, **kwargs)
self.assertEqual(port_id, pf['packet_filter']['in_port'])
- try:
- yield pf
- finally:
- if do_delete:
- self._delete('packet_filters', pf['packet_filter']['id'])
+ yield pf
+ if do_delete:
+ self._delete('packet_filters', pf['packet_filter']['id'])
class TestNecPluginPacketFilter(TestNecPluginPacketFilterBase):
**kwargs):
netpart = self._make_netpartition(fmt or self.fmt, name)
- try:
- yield netpart
- finally:
- if do_delete:
- self._del_netpartition(netpart['net_partition']['id'])
+ yield netpart
+ if do_delete:
+ self._del_netpartition(netpart['net_partition']['id'])
def test_create_netpartition(self):
name = 'netpart1'
**kwargs):
network = self._make_network(fmt or self.fmt, name,
admin_state_up, **kwargs)
- try:
- yield network
- finally:
- if do_delete:
- # The do_delete parameter allows you to control whether the
- # created network is immediately deleted again. Therefore, this
- # function is also usable in tests, which require the creation
- # of many networks.
- self._delete('networks', network['network']['id'])
+ yield network
+ if do_delete:
+ # The do_delete parameter allows you to control whether the
+ # created network is immediately deleted again. Therefore, this
+ # function is also usable in tests, which require the creation
+ # of many networks.
+ self._delete('networks', network['network']['id'])
@contextlib.contextmanager
def subnet(self, network=None,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode)
- try:
- yield subnet
- finally:
- if do_delete:
- self._delete('subnets', subnet['subnet']['id'])
+ yield subnet
+ if do_delete:
+ self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
def port(self, subnet=None, fmt=None, no_delete=False,
with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
- try:
- yield port
- finally:
- if not no_delete:
- self._delete('ports', port['port']['id'])
+ yield port
+ if not no_delete:
+ self._delete('ports', port['port']['id'])
def _test_list_with_sort(self, resource,
items, sorts, resources=None, query_params=''):
if not fmt:
fmt = self.fmt
security_group = self._make_security_group(fmt, name, description)
- try:
- yield security_group
- finally:
- if not no_delete:
- self._delete('security-groups',
- security_group['security_group']['id'])
+ yield security_group
+ if not no_delete:
+ self._delete('security-groups',
+ security_group['security_group']['id'])
@contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
remote_group_id,
ethertype=ethertype)
security_group_rule = self._make_security_group_rule(self.fmt, rule)
- try:
- yield security_group_rule
- finally:
- if not no_delete:
- self._delete('security-group-rules',
- security_group_rule['security_group_rule']['id'])
+ yield security_group_rule
+ if not no_delete:
+ self._delete('security-group-rules',
+ security_group_rule['security_group_rule']['id'])
def _delete_default_security_group_egress_rules(self, security_group_id):
"""Deletes default egress rules given a security group ID."""
router = self._make_router(fmt or self.fmt, tenant_id, name,
admin_state_up, external_gateway_info,
set_context, **kwargs)
- try:
- yield router
- finally:
- self._delete('routers', router['router']['id'])
+ yield router
+ self._delete('routers', router['router']['id'])
def _set_net_external(self, net_id):
self._update('networks', net_id,
sid = private_port['port']['fixed_ips'][0]['subnet_id']
private_sub = {'subnet': {'id': sid}}
floatingip = None
- try:
- self._add_external_gateway_to_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
- self._router_interface_action(
- 'add', r['router']['id'],
- private_sub['subnet']['id'], None)
- floatingip = self._make_floatingip(
- fmt or self.fmt,
- public_sub['subnet']['network_id'],
- port_id=private_port['port']['id'],
- fixed_ip=fixed_ip,
- set_context=False)
- yield floatingip
- finally:
- if floatingip:
- self._delete('floatingips',
- floatingip['floatingip']['id'])
- self._router_interface_action(
- 'remove', r['router']['id'],
- private_sub['subnet']['id'], None)
- self._remove_external_gateway_from_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ public_sub['subnet']['network_id'])
+ self._router_interface_action(
+ 'add', r['router']['id'],
+ private_sub['subnet']['id'], None)
+
+ floatingip = self._make_floatingip(
+ fmt or self.fmt,
+ public_sub['subnet']['network_id'],
+ port_id=private_port['port']['id'],
+ fixed_ip=fixed_ip,
+ set_context=False)
+ yield floatingip
+
+ if floatingip:
+ self._delete('floatingips',
+ floatingip['floatingip']['id'])
+ self._router_interface_action(
+ 'remove', r['router']['id'],
+ private_sub['subnet']['id'], None)
+ self._remove_external_gateway_from_router(
+ r['router']['id'],
+ public_sub['subnet']['network_id'])
@contextlib.contextmanager
def floatingip_no_assoc_with_public_sub(
self._set_net_external(public_sub['subnet']['network_id'])
with self.router() as r:
floatingip = None
- try:
- self._add_external_gateway_to_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
- self._router_interface_action('add', r['router']['id'],
- private_sub['subnet']['id'],
- None)
- floatingip = self._make_floatingip(
- fmt or self.fmt,
- public_sub['subnet']['network_id'],
- set_context=set_context)
- yield floatingip, r
- finally:
- if floatingip:
- self._delete('floatingips',
- floatingip['floatingip']['id'])
- self._router_interface_action('remove', r['router']['id'],
- private_sub['subnet']['id'],
- None)
- self._remove_external_gateway_from_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ public_sub['subnet']['network_id'])
+ self._router_interface_action('add', r['router']['id'],
+ private_sub['subnet']['id'],
+ None)
+
+ floatingip = self._make_floatingip(
+ fmt or self.fmt,
+ public_sub['subnet']['network_id'],
+ set_context=set_context)
+ yield floatingip, r
+
+ if floatingip:
+ self._delete('floatingips',
+ floatingip['floatingip']['id'])
+ self._router_interface_action('remove', r['router']['id'],
+ private_sub['subnet']['id'],
+ None)
+ self._remove_external_gateway_from_router(
+ r['router']['id'],
+ public_sub['subnet']['network_id'])
@contextlib.contextmanager
def floatingip_no_assoc(self, private_sub, fmt=None, set_context=False):
self._add_external_gateway_to_router(
router['router']['id'],
subnet['subnet']['network_id'])
- try:
- yield router
- finally:
- self._remove_external_gateway_from_router(
- router['router']['id'], subnet['subnet']['network_id'])
- self._delete('routers', router['router']['id'])
+
+ yield router
+
+ self._remove_external_gateway_from_router(
+ router['router']['id'], subnet['subnet']['network_id'])
+ self._delete('routers', router['router']['id'])
class L3AgentChanceSchedulerTestCase(L3SchedulerTestCase):
qos_queue = self.deserialize('json', res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
- try:
- yield qos_queue
- finally:
- if not no_delete:
- self._delete('qos-queues',
- qos_queue['qos_queue']['id'])
+
+ yield qos_queue
+
+ if not no_delete:
+ self._delete('qos-queues',
+ qos_queue['qos_queue']['id'])
def test_create_qos_queue(self):
with self.qos_queue(name='fake_lqueue', min=34, max=44,
routers.append(self._plugin.create_router(ctx, router(i)))
# Do not return anything as the user does need the actual
# data created
- try:
- yield
- finally:
- # Remove everything
- for router in routers:
- self._plugin.delete_router(ctx, router['id'])
- for port in ports:
- self._plugin.delete_port(ctx, port['id'])
- # This will remove networks and subnets
- for network in networks:
- self._plugin.delete_network(ctx, network['id'])
+ yield
+
+ # Remove everything
+ for router in routers:
+ self._plugin.delete_router(ctx, router['id'])
+ for port in ports:
+ self._plugin.delete_port(ctx, port['id'])
+ # This will remove networks and subnets
+ for network in networks:
+ self._plugin.delete_network(ctx, network['id'])
def _get_tag_dict(self, tags):
return dict((tag['scope'], tag['tag']) for tag in tags)
data = {'router': {'tenant_id': self._tenant_id}}
data['router']['service_router'] = True
router_req = self.new_create_request('routers', data, self.fmt)
- try:
- res = router_req.get_response(self.ext_api)
- router = self.deserialize(self.fmt, res)
- self._add_external_gateway_to_router(
- router['router']['id'],
- s['subnet']['network_id'])
- router = self._show('routers', router['router']['id'])
- yield router
- finally:
- self._delete('routers', router['router']['id'])
+
+ res = router_req.get_response(self.ext_api)
+ router = self.deserialize(self.fmt, res)
+ self._add_external_gateway_to_router(
+ router['router']['id'],
+ s['subnet']['network_id'])
+ router = self._show('routers', router['router']['id'])
+ yield router
+
+ self._delete('routers', router['router']['id'])
def test_create_vpnservice(self, **extras):
"""Test case to create a vpnservice."""