# Verify that default egress rules have been created
sg_rules = security_group['security_group']['security_group_rules']
- self.assertEqual(len(sg_rules), 2)
+ self.assertEqual(2, len(sg_rules))
v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]
- self.assertEqual(len(v4_rules), 1)
+ self.assertEqual(1, len(v4_rules))
v4_rule = v4_rules[0]
expected = {'direction': 'egress',
'ethertype': const.IPv4,
self._assert_sg_rule_has_kvs(v4_rule, expected)
v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]
- self.assertEqual(len(v6_rules), 1)
+ self.assertEqual(1, len(v6_rules))
v6_rule = v6_rules[0]
expected = {'direction': 'egress',
'ethertype': const.IPv6,
data,
sg['security_group']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
- self.assertEqual(res['security_group']['name'],
- data['security_group']['name'])
- self.assertEqual(res['security_group']['description'],
- data['security_group']['description'])
+ self.assertEqual(data['security_group']['name'],
+ res['security_group']['name'])
+ self.assertEqual(data['security_group']['description'],
+ res['security_group']['description'])
def test_update_security_group_name_to_default_fail(self):
with self.security_group() as sg:
sg['security_group']['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
- self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_update_default_security_group_name_fail(self):
with self.network():
sg['security_groups'][0]['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
- self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
+ self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
def test_update_default_security_group_with_description(self):
with self.network():
data,
sg['security_groups'][0]['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
- self.assertEqual(res['security_group']['description'],
- data['security_group']['description'])
+ self.assertEqual(data['security_group']['description'],
+ res['security_group']['description'])
def test_check_default_security_group_description(self):
with self.network():
with self.network():
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
- self.assertEqual(len(groups['security_groups']), 1)
+ self.assertEqual(1, len(groups['security_groups']))
def test_create_default_security_group_fail(self):
name = 'default'
description = 'my webservers'
res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_default_security_group_check_case_insensitive(self):
name = 'DEFAULT'
description = 'my webservers'
res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_list_security_groups(self):
with self.security_group(name='sg1', description='sg') as v1,\
'22', None, None, ethertype=ethertype)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_ethertype_invalid_for_protocol(self):
name = 'webservers'
security_group_id, 'ingress', const.PROTO_NAME_ICMP_V6)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_invalid_ip_prefix(self):
name = 'webservers'
'22', '22',
remote_ip_prefix)
res = self._create_security_group_rule(self.fmt, rule)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_invalid_ethertype_for_prefix(self):
name = 'webservers'
None,
ethertype=ethertype)
res = self._create_security_group_rule(self.fmt, rule)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_unmasked_prefix(self):
name = 'webservers'
security_group_id, 'ingress', protocol, '22', '22')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_protocol_as_number(self):
name = 'webservers'
security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_case_insensitive(self):
name = 'webservers'
sg_rule = group['security_group']['security_group_rules']
self.assertEqual(group['security_group']['id'],
remote_group_id)
- self.assertEqual(len(sg_rule), 3)
+ self.assertEqual(3, len(sg_rule))
sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
for k, v, in keys:
self.assertEqual(sg_rule[0][k], v)
neutron_context = context.Context('', 'test-tenant')
sg = self._list('security-groups',
neutron_context=neutron_context).get('security_groups')
- self.assertEqual(len(sg), 1)
+ self.assertEqual(1, len(sg))
def test_security_group_port_create_creates_default_security_group(self):
res = self._create_network(self.fmt, 'net1', True,
res = self._create_port(self.fmt, net1['network']['id'],
tenant_id='not_admin', set_context=True)
sg = self._list('security-groups').get('security_groups')
- self.assertEqual(len(sg), 1)
+ self.assertEqual(1, len(sg))
def test_default_security_group_rules(self):
with self.network():
r for r in sg_rules
if r['direction'] == 'egress' and r['ethertype'] == const.IPv4
]
- self.assertEqual(len(rules), 1)
+ self.assertEqual(1, len(rules))
v4_egress = rules[0]
expected = {'direction': 'egress',
r for r in sg_rules
if r['direction'] == 'egress' and r['ethertype'] == const.IPv6
]
- self.assertEqual(len(rules), 1)
+ self.assertEqual(1, len(rules))
v6_egress = rules[0]
expected = {'direction': 'egress',
r for r in sg_rules
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4
]
- self.assertEqual(len(rules), 1)
+ self.assertEqual(1, len(rules))
v4_ingress = rules[0]
expected = {'direction': 'ingress',
r for r in sg_rules
if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6
]
- self.assertEqual(len(rules), 1)
+ self.assertEqual(1, len(rules))
v6_ingress = rules[0]
expected = {'direction': 'ingress',
security_groups=['bad_id'])
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_delete_security_group_port_in_use(self):
with self.network() as n:
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
ret = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
self.assertEqual(2, len(ret['security_group_rules']))
def test_create_security_group_rule_bulk_emulated(self):
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_allow_all_ipv4(self):
with self.security_group() as sg:
res = self._create_security_group_rule(
self.fmt, {'security_group_rule': rule})
rule = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_allow_all_ipv4_v6_bulk(self):
if self._skip_native_bulk:
rules = {'security_group_rules': [rule_v4, rule_v6]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk:
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
real_has_attr = hasattr
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_db(self):
if self._skip_native_bulk:
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
real_has_attr = hasattr
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_security_group_rule_different_security_group_ids(self):
if self._skip_native_bulk:
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_invalid_ethertype(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
ethertype='IPv5')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_invalid_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_with_invalid_tcp_or_udp_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_port_with_non_uuid(self):
with self.network() as n:
def test_convert_ip_prefix_with_netmask_to_cidr(self):
addresses = ['10.1.0.0/16', '10.1.2.3/32', '2001:db8:1234::/48']
for addr in addresses:
- self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(addr), addr)
+ self.assertEqual(addr, ext_sg.convert_ip_prefix_to_cidr(addr))
class TestConvertProtocol(base.BaseTestCase):
ctx,
filters=dict(service_type=[constants.LOADBALANCER])
)
- self.assertEqual(len(res), 1)
+ self.assertEqual(1, len(res))
res = self.manager.get_service_providers(
ctx,
filters=dict(service_type=[constants.FIREWALL])
)
- self.assertEqual(len(res), 1)
+ self.assertEqual(1, len(res))
def test_multiple_default_providers_specified_for_service(self):
self.assertRaises(
# can pass None as a context
p = self.manager.get_default_service_provider(None,
constants.LOADBALANCER)
- self.assertEqual(p, {'service_type': constants.LOADBALANCER,
- 'name': 'lbaas1',
- 'driver': 'driver_path',
- 'default': True})
+ self.assertEqual({'service_type': constants.LOADBALANCER,
+ 'name': 'lbaas1',
+ 'driver': 'driver_path',
+ 'default': True}, p)
self.assertRaises(
provconf.DefaultServiceProviderNotFound,
instance.get_service_providers.assert_called_with(mock.ANY,
filters={},
fields=[])
- self.assertEqual(res.status_int, webexc.HTTPOk.code)
+ self.assertEqual(webexc.HTTPOk.code, res.status_int)
class ServiceTypeManagerExtTestCase(ServiceTypeExtensionTestCaseBase):
def test_list_service_providers(self):
res = self._list_service_providers()
- self.assertEqual(res.status_int, webexc.HTTPOk.code)
+ self.assertEqual(webexc.HTTPOk.code, res.status_int)
data = self.deserialize(res)
self.assertIn('service_providers', data)
self.assertGreaterEqual(len(data['service_providers']), 2)
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
- self.assertEqual(len(ips), 2)
+ self.assertEqual(2, len(ips))
add_expected = {'chg_ip':
{p1['network_id']:
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
- self.assertEqual(len(ips), 2)
+ self.assertEqual(2, len(ips))
upd_expected = {'chg_ip':
{p1['network_id']:
req = self.new_update_request('ports', data, p1['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
- self.assertEqual(len(ips), 1)
+ self.assertEqual(1, len(ips))
del_expected = {'chg_ip':
{p1['network_id']:
self.assertTrue(
port_2_data in self.agent.network_ports[NETWORK_ID]
)
- self.assertEqual(cleaned_port_id, PORT_1)
+ self.assertEqual(PORT_1, cleaned_port_id)
#and now remove last port from network:
cleaned_port_id = self.agent._clean_network_ports(
port_2_data['device']
self.assertTrue(
NETWORK_ID not in self.agent.network_ports.keys()
)
- self.assertEqual(cleaned_port_id, port_2_data['port_id'])
+ self.assertEqual(port_2_data['port_id'], cleaned_port_id)
class TestLinuxBridgeManager(base.BaseTestCase):
def test_get_bridge_name(self):
nw_id = "123456789101112"
- self.assertEqual(self.lbm.get_bridge_name(nw_id),
- "brq" + nw_id[0:11])
+ self.assertEqual("brq" + nw_id[0:11],
+ self.lbm.get_bridge_name(nw_id))
nw_id = ""
- self.assertEqual(self.lbm.get_bridge_name(nw_id),
- "brq")
+ self.assertEqual("brq", self.lbm.get_bridge_name(nw_id))
def test_get_subinterface_name(self):
- self.assertEqual(self.lbm.get_subinterface_name("eth0", "0"),
- "eth0.0")
- self.assertEqual(self.lbm.get_subinterface_name("eth0", ""),
- "eth0.")
+ self.assertEqual("eth0.0",
+ self.lbm.get_subinterface_name("eth0", "0"))
+ self.assertEqual("eth0.", self.lbm.get_subinterface_name("eth0", ""))
def test_get_tap_device_name(self):
if_id = "123456789101112"
- self.assertEqual(self.lbm.get_tap_device_name(if_id),
- constants.TAP_DEVICE_PREFIX + if_id[0:11])
+ self.assertEqual(constants.TAP_DEVICE_PREFIX + if_id[0:11],
+ self.lbm.get_tap_device_name(if_id))
if_id = ""
- self.assertEqual(self.lbm.get_tap_device_name(if_id),
- constants.TAP_DEVICE_PREFIX)
+ self.assertEqual(constants.TAP_DEVICE_PREFIX,
+ self.lbm.get_tap_device_name(if_id))
def test_get_vxlan_device_name(self):
vn_id = p_const.MAX_VXLAN_VNI
- self.assertEqual(self.lbm.get_vxlan_device_name(vn_id),
- "vxlan-" + str(vn_id))
+ self.assertEqual("vxlan-" + str(vn_id),
+ self.lbm.get_vxlan_device_name(vn_id))
self.assertIsNone(self.lbm.get_vxlan_device_name(vn_id + 1))
def test_get_vxlan_group(self):
'SRIOV_NIC')
device_mappings = n_utils.parse_mappings(
cfg.CONF.SRIOV_NIC.physical_device_mappings)
- self.assertEqual(device_mappings, self.DEVICE_MAPPING)
+ self.assertEqual(self.DEVICE_MAPPING, device_mappings)
def test_device_mappings_with_error(self):
cfg.CONF.set_override('physical_device_mappings',
'SRIOV_NIC')
device_mappings = n_utils.parse_mappings(
cfg.CONF.SRIOV_NIC.physical_device_mappings)
- self.assertEqual(device_mappings, self.DEVICE_MAPPING)
+ self.assertEqual(self.DEVICE_MAPPING, device_mappings)
def test_exclude_devices(self):
cfg.CONF.set_override('exclude_devices',
'SRIOV_NIC')
exclude_devices = config.parse_exclude_devices(
cfg.CONF.SRIOV_NIC.exclude_devices)
- self.assertEqual(exclude_devices, self.EXCLUDE_DEVICES)
+ self.assertEqual(self.EXCLUDE_DEVICES, exclude_devices)
def test_exclude_devices_with_spaces(self):
cfg.CONF.set_override('exclude_devices',
'SRIOV_NIC')
exclude_devices = config.parse_exclude_devices(
cfg.CONF.SRIOV_NIC.exclude_devices)
- self.assertEqual(exclude_devices, self.EXCLUDE_DEVICES)
+ self.assertEqual(self.EXCLUDE_DEVICES, exclude_devices)
def test_exclude_devices_with_error(self):
cfg.CONF.set_override('exclude_devices',
config_parser.parse()
device_mappings = config_parser.device_mappings
exclude_devices = config_parser.exclude_devices
- self.assertEqual(exclude_devices, self.EXCLUDE_DEVICES)
- self.assertEqual(device_mappings, self.DEVICE_MAPPING)
+ self.assertEqual(self.EXCLUDE_DEVICES, exclude_devices)
+ self.assertEqual(self.DEVICE_MAPPING, device_mappings)
def test_validate_config_fail(self):
cfg.CONF.set_override('physical_device_mappings',
def test_add_metering_label(self):
self.agent.add_metering_label(None, ROUTERS)
- self.assertEqual(self.driver.add_metering_label.call_count, 1)
+ self.assertEqual(1, self.driver.add_metering_label.call_count)
def test_remove_metering_label(self):
self.agent.remove_metering_label(None, ROUTERS)
- self.assertEqual(self.driver.remove_metering_label.call_count, 1)
+ self.assertEqual(1, self.driver.remove_metering_label.call_count)
def test_update_metering_label_rule(self):
self.agent.update_metering_label_rules(None, ROUTERS)
- self.assertEqual(self.driver.update_metering_label_rules.call_count, 1)
+ self.assertEqual(1, self.driver.update_metering_label_rules.call_count)
def test_add_metering_label_rule(self):
self.agent.add_metering_label_rule(None, ROUTERS_WITH_RULE)
- self.assertEqual(self.driver.add_metering_label_rule.call_count, 1)
+ self.assertEqual(1, self.driver.add_metering_label_rule.call_count)
def test_remove_metering_label_rule(self):
self.agent.remove_metering_label_rule(None, ROUTERS_WITH_RULE)
- self.assertEqual(self.driver.remove_metering_label_rule.call_count, 1)
+ self.assertEqual(1, self.driver.remove_metering_label_rule.call_count)
def test_routers_updated(self):
self.agent.routers_updated(None, ROUTERS)
- self.assertEqual(self.driver.update_routers.call_count, 1)
+ self.assertEqual(1, self.driver.update_routers.call_count)
def test_get_traffic_counters(self):
self.agent._get_traffic_counters(None, ROUTERS)
- self.assertEqual(self.driver.get_traffic_counters.call_count, 1)
+ self.assertEqual(1, self.driver.get_traffic_counters.call_count)
def test_notification_report(self):
self.agent.routers_updated(None, ROUTERS)
if n['event_type'] == 'l3.meter':
break
- self.assertEqual(n['event_type'], 'l3.meter')
+ self.assertEqual('l3.meter', n['event_type'])
payload = n['payload']
- self.assertEqual(payload['tenant_id'], TENANT_ID)
- self.assertEqual(payload['label_id'], LABEL_ID)
- self.assertEqual(payload['pkts'], 88)
- self.assertEqual(payload['bytes'], 444)
+ self.assertEqual(TENANT_ID, payload['tenant_id'])
+ self.assertEqual(LABEL_ID, payload['label_id'])
+ self.assertEqual(88, payload['pkts'])
+ self.assertEqual(444, payload['bytes'])
def test_router_deleted(self):
label_id = _uuid()
self.agent.routers_updated(None, ROUTERS)
self.agent.router_deleted(None, ROUTERS[0]['id'])
- self.assertEqual(self.agent._add_metering_info.call_count, 1)
- self.assertEqual(self.driver.remove_router.call_count, 1)
+ self.assertEqual(1, self.agent._add_metering_info.call_count)
+ self.assertEqual(1, self.driver.remove_router.call_count)
self.agent._add_metering_info.assert_called_with(label_id, 44, 222)
'driver': 'driver_path',
'default': False}
res = provconf.parse_service_provider_opt()
- self.assertEqual(len(res), 1)
- self.assertEqual(res, [expected])
+ self.assertEqual(1, len(res))
+ self.assertEqual([expected], res)
def test_parse_single_default_service_provider_opt(self):
self._set_override([constants.LOADBALANCER +
'driver': 'driver_path',
'default': True}
res = provconf.parse_service_provider_opt()
- self.assertEqual(len(res), 1)
- self.assertEqual(res, [expected])
+ self.assertEqual(1, len(res))
+ self.assertEqual([expected], res)
def test_parse_multi_service_provider_opt(self):
self._set_override([constants.LOADBALANCER +
'driver': 'path',
'default': False}
pconf.add_provider(prov)
- self.assertEqual(len(pconf.providers), 1)
- self.assertEqual(list(pconf.providers.keys()),
- [(constants.LOADBALANCER, 'name')])
- self.assertEqual(list(pconf.providers.values()),
- [{'driver': 'path', 'default': False}])
+ self.assertEqual(1, len(pconf.providers))
+ self.assertEqual([(constants.LOADBALANCER, 'name')],
+ list(pconf.providers.keys()))
+ self.assertEqual([{'driver': 'path', 'default': False}],
+ list(pconf.providers.values()))
def test_add_duplicate_provider(self):
pconf = provconf.ProviderConfiguration()
'default': False}
pconf.add_provider(prov)
self.assertRaises(n_exc.Invalid, pconf.add_provider, prov)
- self.assertEqual(len(pconf.providers), 1)
+ self.assertEqual(1, len(pconf.providers))
def test_get_service_providers(self):
self._set_override([constants.LOADBALANCER + ':name:path',
filters={'name': [prov['name']],
'service_type': prov['service_type']}
)
- self.assertEqual(p, [prov])
+ self.assertEqual([prov], p)
def test_get_service_providers_with_fields(self):
self._set_override([constants.LOADBALANCER + ":name:path",
'service_type': prov['service_type']},
fields=['name']
)
- self.assertEqual(p, [{'name': prov['name']}])
+ self.assertEqual([{'name': prov['name']}], p)
class GetProviderDriverClassTestCase(base.BaseTestCase):
def test_no_user_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
response = self.request.get_response(self.middleware)
- self.assertEqual(response.status, '401 Unauthorized')
+ self.assertEqual('401 Unauthorized', response.status)
def test_with_user_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
- self.assertEqual(response.status, '200 OK')
- self.assertEqual(self.context.user_id, 'testuserid')
- self.assertEqual(self.context.user, 'testuserid')
+ self.assertEqual('200 OK', response.status)
+ self.assertEqual('testuserid', self.context.user_id)
+ self.assertEqual('testuserid', self.context.user)
def test_with_tenant_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'test_user_id'
response = self.request.get_response(self.middleware)
- self.assertEqual(response.status, '200 OK')
- self.assertEqual(self.context.tenant_id, 'testtenantid')
- self.assertEqual(self.context.tenant, 'testtenantid')
+ self.assertEqual('200 OK', response.status)
+ self.assertEqual('testtenantid', self.context.tenant_id)
+ self.assertEqual('testtenantid', self.context.tenant)
def test_roles_no_admin(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_ROLES'] = 'role1, role2 , role3,role4,role5'
response = self.request.get_response(self.middleware)
- self.assertEqual(response.status, '200 OK')
- self.assertEqual(self.context.roles, ['role1', 'role2', 'role3',
- 'role4', 'role5'])
+ self.assertEqual('200 OK', response.status)
+ self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5'],
+ self.context.roles)
self.assertFalse(self.context.is_admin)
def test_roles_with_admin(self):
self.request.headers['X_ROLES'] = ('role1, role2 , role3,role4,role5,'
'AdMiN')
response = self.request.get_response(self.middleware)
- self.assertEqual(response.status, '200 OK')
- self.assertEqual(self.context.roles, ['role1', 'role2', 'role3',
- 'role4', 'role5', 'AdMiN'])
+ self.assertEqual('200 OK', response.status)
+ self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5',
+ 'AdMiN'], self.context.roles)
self.assertTrue(self.context.is_admin)
def test_with_user_tenant_name(self):
self.request.headers['X_PROJECT_NAME'] = 'testtenantname'
self.request.headers['X_USER_NAME'] = 'testusername'
response = self.request.get_response(self.middleware)
- self.assertEqual(response.status, '200 OK')
- self.assertEqual(self.context.user_id, 'testuserid')
- self.assertEqual(self.context.user_name, 'testusername')
- self.assertEqual(self.context.tenant_id, 'testtenantid')
- self.assertEqual(self.context.tenant_name, 'testtenantname')
+ self.assertEqual('200 OK', response.status)
+ self.assertEqual('testuserid', self.context.user_id)
+ self.assertEqual('testusername', self.context.user_name)
+ self.assertEqual('testtenantid', self.context.tenant_id)
+ self.assertEqual('testtenantname', self.context.tenant_name)
def test_request_id_extracted_from_env(self):
req_id = 'dummy-request-id'
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
- self.assertEqual(response.status, '200 OK')
- self.assertEqual(self.context.auth_token, 'testauthtoken')
+ self.assertEqual('200 OK', response.status)
+ self.assertEqual('testauthtoken', self.context.auth_token)
def test_without_auth_token(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
actual = post_mortem_debug.get_ignored_traceback(root_tb)
if expected is not None:
expected = tracebacks[expected]
- self.assertEqual(actual, expected)
+ self.assertEqual(expected, actual)
def test_no_ignored_tracebacks(self):
self._test_get_ignored_traceback([0, 0, 0], None)