req.query_string,
''))
- h = httplib2.Http(ca_certs=self.conf.auth_ca_cert,
- disable_ssl_certificate_validation=
- self.conf.nova_metadata_insecure)
+ h = httplib2.Http(
+ ca_certs=self.conf.auth_ca_cert,
+ disable_ssl_certificate_validation=self.conf.nova_metadata_insecure
+ )
if self.conf.nova_client_cert and self.conf.nova_client_priv_key:
h.add_certificate(self.conf.nova_client_priv_key,
self.conf.nova_client_cert,
# If we find an invalid rule in the list we
# do not perform the update since this breaks
# the integrity of this list.
- raise firewall.FirewallRuleNotFound(firewall_rule_id=
- fwrule_id)
+ raise firewall.FirewallRuleNotFound(
+ firewall_rule_id=fwrule_id)
elif rules_dict[fwrule_id]['firewall_policy_id']:
if (rules_dict[fwrule_id]['firewall_policy_id'] !=
fwp_db['id']):
status = (const.CREATED
if cfg.CONF.router_distributed else const.PENDING_CREATE)
with context.session.begin(subtransactions=True):
- firewall_db = Firewall(id=uuidutils.generate_uuid(),
- tenant_id=tenant_id,
- name=fw['name'],
- description=fw['description'],
- firewall_policy_id=
- fw['firewall_policy_id'],
- admin_state_up=fw['admin_state_up'],
- status=status)
+ firewall_db = Firewall(
+ id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ name=fw['name'],
+ description=fw['description'],
+ firewall_policy_id=fw['firewall_policy_id'],
+ admin_state_up=fw['admin_state_up'],
+ status=status)
context.session.add(firewall_db)
return self._make_firewall_dict(firewall_db)
dst_port_min, dst_port_max = self._get_min_max_ports_from_range(
fwr['destination_port'])
with context.session.begin(subtransactions=True):
- fwr_db = FirewallRule(id=uuidutils.generate_uuid(),
- tenant_id=tenant_id,
- name=fwr['name'],
- description=fwr['description'],
- shared=fwr['shared'],
- protocol=fwr['protocol'],
- ip_version=fwr['ip_version'],
- source_ip_address=fwr['source_ip_address'],
- destination_ip_address=
- fwr['destination_ip_address'],
- source_port_range_min=src_port_min,
- source_port_range_max=src_port_max,
- destination_port_range_min=dst_port_min,
- destination_port_range_max=dst_port_max,
- action=fwr['action'],
- enabled=fwr['enabled'])
+ fwr_db = FirewallRule(
+ id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ name=fwr['name'],
+ description=fwr['description'],
+ shared=fwr['shared'],
+ protocol=fwr['protocol'],
+ ip_version=fwr['ip_version'],
+ source_ip_address=fwr['source_ip_address'],
+ destination_ip_address=fwr['destination_ip_address'],
+ source_port_range_min=src_port_min,
+ source_port_range_max=src_port_max,
+ destination_port_range_min=dst_port_min,
+ destination_port_range_max=dst_port_max,
+ action=fwr['action'],
+ enabled=fwr['enabled'])
context.session.add(fwr_db)
return self._make_firewall_rule_dict(fwr_db)
cidrs = [r['remote_ip_prefix'] for r in r_ips]
new_cidr_ipset = netaddr.IPSet([remote_ip_prefix])
if (netaddr.IPSet(cidrs) & new_cidr_ipset):
- raise metering.MeteringLabelRuleOverlaps(remote_ip_prefix=
- remote_ip_prefix)
+ raise metering.MeteringLabelRuleOverlaps(
+ remote_ip_prefix=remote_ip_prefix)
def create_metering_label_rule(self, context, metering_label_rule):
m = metering_label_rule['metering_label_rule']
if 0 <= val <= 255:
return val
else:
- raise FirewallRuleInvalidProtocol(protocol=value,
- values=
- fw_valid_protocol_values)
+ raise FirewallRuleInvalidProtocol(
+ protocol=value,
+ values=fw_valid_protocol_values)
elif value.lower() in fw_valid_protocol_values:
return value.lower()
else:
- raise FirewallRuleInvalidProtocol(protocol=value,
- values=
- fw_valid_protocol_values)
+ raise FirewallRuleInvalidProtocol(
+ protocol=value,
+ values=fw_valid_protocol_values)
def convert_action_to_case_insensitive(value):
with context.session.begin(subtransactions=True):
hd_db = l3_models.HostingDevice(
id=hd.get('id') or uuidutils.generate_uuid(),
- complementary_id = hd.get('complementary_id'),
+ complementary_id=hd.get('complementary_id'),
tenant_id=tenant_id,
device_id=hd.get('device_id'),
admin_state_up=hd.get('admin_state_up', True),
profile_id=profile_id, profile_type=profile_type).delete()
new_tenants_set = set(tenants)
for tenant_id in new_tenants_set:
- tenant = n1kv_models_v2.ProfileBinding(profile_type = profile_type,
- tenant_id = tenant_id,
- profile_id = profile_id)
+ tenant = n1kv_models_v2.ProfileBinding(profile_type=profile_type,
+ tenant_id=tenant_id,
+ profile_id=profile_id)
db_session.add(tenant)
# stop normal patch
self.httpPatch.stop()
with patch(HTTPCON, new=fake_server.HTTPConnectionMock500):
- self._delete('ports', port['port']['id'],
- expected_code=
- webob.exc.HTTPInternalServerError.code)
+ self._delete(
+ 'ports',
+ port['port']['id'],
+ expected_code=webob.exc.HTTPInternalServerError.code)
self.httpPatch.start()
port = self._get_ports(n['network']['id'])[0]
self.assertEqual('BUILD', port['status'])
self.assertIsNotNone(hosting_device_1)
cfg_dh_rpc.report_non_responding_hosting_devices(
self.adminContext,
- host = None,
+ host=None,
hosting_device_ids=[hosting_device_1['id']])
self.assertEqual(1, mock_notify.call_count)
mock_notify.assert_called_with(
net_p_req = self.new_create_request('network_profiles', data)
net_p_req.environ['neutron.context'] = context.Context('',
self.tenant_id,
- is_admin = True)
+ is_admin=True)
res = net_p_req.get_response(self.ext_api)
self.assertEqual(201, res.status_int)
net_p = self.deserialize(self.fmt, res)
net_p['network_profile']['id'])
update_req.environ['neutron.context'] = context.Context('',
self.tenant_id,
- is_admin = True)
+ is_admin=True)
update_res = update_req.get_response(self.ext_api)
self.assertEqual(200, update_res.status_int)
db_session = db.get_session()
net_p['network_profile']['id'])
update_req.environ['neutron.context'] = context.Context('',
self.tenant_id,
- is_admin = True)
+ is_admin=True)
update_res = update_req.get_response(self.ext_api)
self.assertEqual(200, update_res.status_int)
# current tenant_id should always present
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- ADMIN_STATE_UP):
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=ADMIN_STATE_UP):
req = self.new_delete_request('firewall_policies', fwp_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 409)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(name=attrs['name'],
- firewall_policy_id=fwp_id,
- admin_state_up=
- ADMIN_STATE_UP) as firewall:
+ with self.firewall(
+ name=attrs['name'],
+ firewall_policy_id=fwp_id,
+ admin_state_up=ADMIN_STATE_UP
+ ) as firewall:
for k, v in attrs.iteritems():
self.assertEqual(firewall['firewall'][k], v)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(name=name,
- firewall_policy_id=fwp_id,
- admin_state_up=
- ADMIN_STATE_UP) as firewall:
+ with self.firewall(
+ name=name,
+ firewall_policy_id=fwp_id,
+ admin_state_up=ADMIN_STATE_UP) as firewall:
req = self.new_show_request('firewalls',
firewall['firewall']['id'],
fmt=self.fmt)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- ADMIN_STATE_UP) as firewall:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=ADMIN_STATE_UP) as firewall:
data = {'firewall': {'name': name}}
req = self.new_update_request('firewalls', data,
firewall['firewall']['id'])
tenant = '123'
# Make one of the switches throw an exception - i.e. fail
- self.drv._servers[0].runCmds = mock.Mock(side_effect = Exception())
+ self.drv._servers[0].runCmds = mock.Mock(side_effect=Exception)
self.drv.create_router(None, tenant, router)
def test_delete_router_when_one_switch_fails(self):
router_id = '345'
# Make one of the switches throw an exception - i.e. fail
- self.drv._servers[1].runCmds = mock.Mock(side_effect = Exception())
+ self.drv._servers[1].runCmds = mock.Mock(side_effect=Exception)
self.drv.delete_router(None, tenant, router_id, router)
def test_add_router_interface_when_one_switch_fails(self):
router['gip'] = '10.10.10.1'
# Make one of the switches throw an exception - i.e. fail
- self.drv._servers[1].runCmds = mock.Mock(side_effect = Exception())
+ self.drv._servers[1].runCmds = mock.Mock(side_effect=Exception)
self.drv.add_router_interface(None, router)
def test_remove_router_interface_when_one_switch_fails(self):
router['gip'] = '10.10.10.1'
# Make one of the switches throw an exception - i.e. fail
- self.drv._servers[0].runCmds = mock.Mock(side_effect = Exception())
+ self.drv._servers[0].runCmds = mock.Mock(side_effect=Exception)
self.drv.remove_router_interface(None, router)
(dp, ofp, ofpp) = br._get_dp()
call = mock.call
expected_calls = [
- call(ofpp.OFPFlowMod(dp, instructions=[
- ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS,
- [ofpp.OFPActionSetField(tunnel_id=112),
- ofpp.OFPActionOutput(port=113),
- ofpp.OFPActionOutput(port=114)]),
- ofpp.OFPInstructionGotoTable(table_id=111)],
- match=ofpp.OFPMatch(metadata=
- meta.mk_metadata(111, meta.LOCAL)),
- priority=1, table_id=110))
+ call(
+ ofpp.OFPFlowMod(
+ dp,
+ instructions=[
+ ofpp.OFPInstructionActions(
+ ofp.OFPIT_APPLY_ACTIONS,
+ [
+ ofpp.OFPActionSetField(tunnel_id=112),
+ ofpp.OFPActionOutput(port=113),
+ ofpp.OFPActionOutput(port=114)
+ ]
+ ),
+ ofpp.OFPInstructionGotoTable(table_id=111)
+ ],
+ match=ofpp.OFPMatch(
+ metadata=meta.mk_metadata(111, meta.LOCAL)
+ ),
+ priority=1,
+ table_id=110
+ )
+ )
]
sendmsg.assert_has_calls(expected_calls)
(dp, ofp, ofpp) = br._get_dp()
call = mock.call
expected_calls = [
- call(ofpp.OFPFlowMod(dp, command=ofp.OFPFC_DELETE,
- match=ofpp.OFPMatch(metadata=
- meta.mk_metadata(111, meta.LOCAL)),
- out_group=ofp.OFPG_ANY,
- out_port=ofp.OFPP_ANY, priority=0, table_id=110))
+ call(
+ ofpp.OFPFlowMod(
+ dp,
+ command=ofp.OFPFC_DELETE,
+ match=ofpp.OFPMatch(
+ metadata=meta.mk_metadata(111, meta.LOCAL)
+ ),
+ out_group=ofp.OFPG_ANY,
+ out_port=ofp.OFPP_ANY, priority=0, table_id=110
+ )
+ )
]
sendmsg.assert_has_calls(expected_calls)
match=ofpp.OFPMatch(in_port=99,
vlan_vid=151 | ofp.OFPVID_PRESENT),
priority=1, table_id=0)),
- call(ofpp.OFPFlowMod(dp, instructions=[
- ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
- ofpp.OFPActionPushVlan(),
- ofpp.OFPActionSetField(vlan_vid=151 | ofp.OFPVID_PRESENT),
- ofpp.OFPActionOutput(port=99), ofpp.OFPActionPopVlan()]),
- ofpp.OFPInstructionGotoTable(table_id=13)],
- match=ofpp.OFPMatch(metadata=
- meta.mk_metadata(150, meta.LOCAL)),
- priority=1, table_id=12))
+ call(
+ ofpp.OFPFlowMod(
+ dp,
+ instructions=[
+ ofpp.OFPInstructionActions(
+ ofp.OFPIT_APPLY_ACTIONS,
+ [
+ ofpp.OFPActionPushVlan(),
+ ofpp.OFPActionSetField(
+ vlan_vid=151 | ofp.OFPVID_PRESENT
+ ),
+ ofpp.OFPActionOutput(port=99),
+ ofpp.OFPActionPopVlan()
+ ]
+ ),
+ ofpp.OFPInstructionGotoTable(table_id=13)
+ ],
+ match=ofpp.OFPMatch(
+ metadata=meta.mk_metadata(150, meta.LOCAL)
+ ),
+ priority=1,
+ table_id=12
+ )
+ )
]
sendmsg.assert_has_calls(expected_calls)
ctx = context.get_admin_context()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP) as fw:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ) as fw:
fw_id = fw['firewall']['id']
res = self.callbacks.set_firewall_status(ctx, fw_id,
const.ACTIVE,
ctx = context.get_admin_context()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP) as fw:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ) as fw:
fw_id = fw['firewall']['id']
fw_db = self.plugin._get_firewall(ctx, fw_id)
fw_db['status'] = const.PENDING_DELETE
res = req.get_response(self.ext_api)
attrs = self._get_test_firewall_attrs()
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- tenant_id=tenant_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP) as fw:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ tenant_id=tenant_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP) as fw:
fw_id = fw['firewall']['id']
res = self.callbacks.get_firewalls_for_tenant(ctx,
host='dummy')
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP) as firewall:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ) as firewall:
fw_id = firewall['firewall']['id']
res = self.callbacks.set_firewall_status(ctx, fw_id,
const.ACTIVE)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP) as firewall:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ) as firewall:
fw_id = firewall['firewall']['id']
data = {'firewall': {'name': name}}
req = self.new_update_request('firewalls', data, fw_id)
ctx = context.get_admin_context()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP,
- tenant_id='noadmin') as firewall:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP,
+ tenant_id='noadmin'
+ ) as firewall:
fw_id = firewall['firewall']['id']
self.callbacks.set_firewall_status(ctx, fw_id,
const.ACTIVE)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP):
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ):
data = {'firewall_policy': {'name': name}}
req = self.new_update_request('firewall_policies',
data, fwp_id)
req = self.new_update_request('firewall_policies', data,
fwp_id)
req.get_response(self.ext_api)
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP):
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ):
data = {'firewall_rule': {'protocol': 'udp'}}
req = self.new_update_request('firewall_rules',
data, fr_id)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP) as firewall:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ) as firewall:
fw_id = firewall['firewall']['id']
attrs = self._replace_firewall_status(attrs,
const.PENDING_CREATE,
req.get_response(self.ext_api)
attrs = self._get_test_firewall_attrs()
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP) as fw:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ) as fw:
fw_id = fw['firewall']['id']
fw_rules = (
self.plugin._make_firewall_dict_with_rules(ctx,
'test_interface',
'mac_addr',
namespace='test_ns')
- self.vif_driver.init_l3.assert_called_once_with('test_interface',
- ['10.0.0.2/24'],
- namespace=
- 'test_ns')
+ self.vif_driver.init_l3.assert_called_once_with(
+ 'test_interface',
+ ['10.0.0.2/24'],
+ namespace='test_ns'
+ )
cmd = ['route', 'add', 'default', 'gw', '10.0.0.1']
cmd_arping = ['arping', '-U', '-I',
'test_interface', '-c',
'test_interface',
'mac_addr',
namespace='test_ns')
- self.vif_driver.init_l3.assert_called_once_with('test_interface',
- ['10.0.0.2/24'],
- namespace=
- 'test_ns')
+ self.vif_driver.init_l3.assert_called_once_with(
+ 'test_interface',
+ ['10.0.0.2/24'],
+ namespace='test_ns'
+ )
self.assertFalse(ip_wrap.called)
dev_exists.return_value = True
self.assertRaises(exceptions.PreexistingDeviceFailure,
'test_interface',
'mac_addr',
namespace='test_ns')
- self.vif_driver.init_l3.assert_called_once_with('test_interface',
- ['10.0.0.2/24'],
- namespace=
- 'test_ns')
+ self.vif_driver.init_l3.assert_called_once_with(
+ 'test_interface',
+ ['10.0.0.2/24'],
+ namespace='test_ns'
+ )
cmd = ['route', 'add', 'default', 'gw', '10.0.0.1']
ip_wrap.assert_has_calls([
mock.call('sudo_test', namespace='test_ns'),
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
- instance.create_firewall_rule.assert_called_with(mock.ANY,
- firewall_rule=
- {'firewall_rule':
- expected_call_args})
+ instance.create_firewall_rule.assert_called_with(
+ mock.ANY,
+ firewall_rule={'firewall_rule': expected_call_args})
self.assertEqual(res.status_int, exc.HTTPCreated.code)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
fmt=self.fmt),
self.serialize(update_data))
- instance.update_firewall_rule.assert_called_with(mock.ANY,
- rule_id,
- firewall_rule=
- update_data)
+ instance.update_firewall_rule.assert_called_with(
+ mock.ANY,
+ rule_id,
+ firewall_rule=update_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
- instance.create_firewall_policy.assert_called_with(mock.ANY,
- firewall_policy=
- data)
+ instance.create_firewall_policy.assert_called_with(
+ mock.ANY,
+ firewall_policy=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
fmt=self.fmt),
self.serialize(update_data))
- instance.update_firewall_policy.assert_called_with(mock.ANY,
- policy_id,
- firewall_policy=
- update_data)
+ instance.update_firewall_policy.assert_called_with(
+ mock.ANY,
+ policy_id,
+ firewall_policy=update_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
attrs['router_id'] = self._create_and_get_router()
- with self.firewall(name=name,
- firewall_policy_id=fwp_id,
- router_id=attrs['router_id'],
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP,
- expected_res_status=201) as fw:
+ with self.firewall(
+ name=name,
+ firewall_policy_id=fwp_id,
+ router_id=attrs['router_id'],
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP,
+ expected_res_status=201
+ ) as fw:
attrs = self._replace_firewall_status(
attrs, const.PENDING_CREATE, const.ACTIVE)
for k, v in attrs.iteritems():
attrs = self._get_test_firewall_attrs(name)
attrs['router_id'] = self._create_and_get_router()
- with self.firewall(name=name,
- router_id=attrs['router_id'],
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP,
- expected_res_status=201) as fw:
+ with self.firewall(
+ name=name,
+ router_id=attrs['router_id'],
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP,
+ expected_res_status=201
+ ) as fw:
attrs = self._replace_firewall_status(
attrs, const.PENDING_CREATE, const.ACTIVE)
for k, v in attrs.iteritems():
router_id = self._create_and_get_router()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
- with self.firewall(name='fw',
- firewall_policy_id=fwp_id,
- router_id=router_id,
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP,
- expected_res_status=201):
+ with self.firewall(
+ name='fw',
+ firewall_policy_id=fwp_id,
+ router_id=router_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP,
+ expected_res_status=201
+ ):
self._delete('routers', router_id,
expected_code=webob.exc.HTTPConflict.code)
firewall_rules=None,
audited=test_db_firewall.AUDITED) as fwp:
fwp_id = fwp['firewall_policy']['id']
- with self.firewall(firewall_policy_id=fwp_id,
- router_id=self._create_and_get_router(),
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP):
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ router_id=self._create_and_get_router(),
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ):
data = {'firewall_policy': {'name': name}}
req = self.new_update_request(
'firewall_policies', data, fwp['firewall_policy']['id'])
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
- with self.firewall(firewall_policy_id=fwp_id,
- router_id=self._create_and_get_router(),
- admin_state_up=
- test_db_firewall.ADMIN_STATE_UP):
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ router_id=self._create_and_get_router(),
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ):
fwr_id = fwr['firewall_rule']['id']
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
req = self.new_update_request(
# E126 continuation line over-indented for hanging indent
# E128 continuation line under-indented for visual indent
# E129 visually indented line with same indent as next logical line
-# E251 unexpected spaces around keyword / parameter equals
# E265 block comment should start with ‘# ‘
# E713 test for membership should be ‘not in’
# F402 import module shadowed by loop variable
# H405 multi line docstring summary not separated with an empty line
# H904 Wrap long lines in parentheses instead of a backslash
# TODO(marun) H404 multi line docstring should start with a summary
-ignore = E125,E126,E128,E129,E251,E265,E713,F402,F811,F812,H104,H237,H305,H307,H401,H402,H404,H405,H904
+ignore = E125,E126,E128,E129,E265,E713,F402,F811,F812,H104,H237,H305,H307,H401,H402,H404,H405,H904
show-source = true
builtins = _
exclude = .venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,tools,.ropeproject,rally-scenarios