import webob.exc
from neutron.api.v2 import attributes as attr
+from neutron.common import constants as const
from neutron.common.test_lib import test_config
from neutron import context
from neutron.db import db_base_plugin_v2
port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None,
tenant_id='test_tenant',
- ethertype='IPv4'):
+ ethertype=const.IPv4):
data = {'security_group_rule': {'security_group_id': security_group_id,
'direction': direction,
def _make_security_group(self, fmt, name, description, **kwargs):
res = self._create_security_group(fmt, name, description, **kwargs)
- if res.status_int >= 400:
+ if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_security_group_rule(self, fmt, rules, **kwargs):
res = self._create_security_group_rule(self.fmt, rules)
- if res.status_int >= 400:
+ if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
'd1db38eb087',
- direction='ingress', protocol='tcp',
+ direction='ingress', protocol=const.PROTO_NAME_TCP,
port_range_min='22', port_range_max='22',
remote_ip_prefix=None, remote_group_id=None,
- fmt=None, no_delete=False, ethertype='IPv4'):
+ fmt=None, no_delete=False, ethertype=const.IPv4):
if not fmt:
fmt = self.fmt
rule = self._build_security_group_rule(security_group_id,
sg_rules = security_group['security_group']['security_group_rules']
self.assertEqual(len(sg_rules), 2)
- v4_rules = filter(lambda x: x['ethertype'] == 'IPv4', sg_rules)
+ v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]
self.assertEqual(len(v4_rules), 1)
v4_rule = v4_rules[0]
expected = {'direction': 'egress',
- 'ethertype': 'IPv4',
+ 'ethertype': const.IPv4,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
'port_range_min': None}
self._assert_sg_rule_has_kvs(v4_rule, expected)
- v6_rules = filter(lambda x: x['ethertype'] == 'IPv6', sg_rules)
+ v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]
self.assertEqual(len(v6_rules), 1)
v6_rule = v6_rules[0]
expected = {'direction': 'egress',
- 'ethertype': 'IPv6',
+ 'ethertype': const.IPv6,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
sg['security_group']['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
- self.assertEqual(res.status_int, 409)
+ self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_update_default_security_group_name_fail(self):
with self.network():
sg['security_groups'][0]['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.ext_api)
- self.assertEqual(res.status_int, 404)
+ self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_update_default_security_group_with_description(self):
with self.network():
description = 'my webservers'
res = self._create_security_group(self.fmt, name, description)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
+ self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_list_security_groups(self):
with contextlib.nested(self.security_group(name='sg1',
security_group_id = sg['security_group']['id']
ethertype = 2
rule = self._build_security_group_rule(
- security_group_id, 'ingress', 'tcp', '22', '22', None, None,
- ethertype=ethertype)
+ security_group_id, 'ingress', const.PROTO_NAME_TCP, '22',
+ '22', None, None, ethertype=ethertype)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_tcp_protocol_as_number(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
- protocol = 6 # TCP
+ protocol = const.PROTO_NUM_TCP # TCP
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '22', '22')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_protocol_as_number(self):
name = 'webservers'
security_group_id, 'ingress', protocol)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_case_insensitive(self):
name = 'webservers'
self.assertEqual(rule['security_group_rule']['protocol'],
protocol.lower())
self.assertEqual(rule['security_group_rule']['ethertype'],
- 'IPv4')
+ const.IPv4)
def test_get_security_group(self):
name = 'webservers'
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix),
self.assertEqual(group['security_group']['id'],
remote_group_id)
self.assertEqual(len(sg_rule), 3)
- sg_rule = filter(lambda x: x['direction'] == 'ingress',
- sg_rule)
+ sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
for k, v, in keys:
self.assertEqual(sg_rule[0][k], v)
description = 'my webservers'
with self.security_group(name, description, no_delete=True) as sg:
remote_group_id = sg['security_group']['id']
- self._delete('security-groups', remote_group_id, 204)
+ self._delete('security-groups', remote_group_id,
+ webob.exc.HTTPNoContent.code)
def test_delete_default_security_group_admin(self):
with self.network():
res = self.new_list_request('security-groups')
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
self._delete('security-groups', sg['security_groups'][0]['id'],
- 204)
+ webob.exc.HTTPNoContent.code)
def test_delete_default_security_group_nonadmin(self):
with self.network():
sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
neutron_context = context.Context('', 'test-tenant')
self._delete('security-groups', sg['security_groups'][0]['id'],
- 409, neutron_context=neutron_context)
+ webob.exc.HTTPConflict.code,
+ neutron_context=neutron_context)
def test_security_group_list_creates_default_security_group(self):
neutron_context = context.Context('', 'test-tenant')
# Verify default rule for v4 egress
sg_rules = rules['security_group_rules']
- rules = filter(
- lambda x: (
- x['direction'] == 'egress' and x['ethertype'] == 'IPv4'),
- sg_rules)
+ rules = [
+ r for r in sg_rules
+ if r['direction'] == 'egress' and r['ethertype'] == const.IPv4
+ ]
self.assertEqual(len(rules), 1)
v4_egress = rules[0]
expected = {'direction': 'egress',
- 'ethertype': 'IPv4',
+ 'ethertype': const.IPv4,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
self._assert_sg_rule_has_kvs(v4_egress, expected)
# Verify default rule for v6 egress
- rules = filter(
- lambda x: (
- x['direction'] == 'egress' and x['ethertype'] == 'IPv6'),
- sg_rules)
+ rules = [
+ r for r in sg_rules
+ if r['direction'] == 'egress' and r['ethertype'] == const.IPv6
+ ]
self.assertEqual(len(rules), 1)
v6_egress = rules[0]
expected = {'direction': 'egress',
- 'ethertype': 'IPv6',
+ 'ethertype': const.IPv6,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
self._assert_sg_rule_has_kvs(v6_egress, expected)
# Verify default rule for v4 ingress
- rules = filter(
- lambda x: (
- x['direction'] == 'ingress' and x['ethertype'] == 'IPv4'),
- sg_rules)
+ rules = [
+ r for r in sg_rules
+ if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4
+ ]
self.assertEqual(len(rules), 1)
v4_ingress = rules[0]
expected = {'direction': 'ingress',
- 'ethertype': 'IPv4',
+ 'ethertype': const.IPv4,
'remote_group_id': security_group_id,
'remote_ip_prefix': None,
'protocol': None,
self._assert_sg_rule_has_kvs(v4_ingress, expected)
# Verify default rule for v6 ingress
- rules = filter(
- lambda x: (
- x['direction'] == 'ingress' and x['ethertype'] == 'IPv6'),
- sg_rules)
+ rules = [
+ r for r in sg_rules
+ if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6
+ ]
self.assertEqual(len(rules), 1)
v6_ingress = rules[0]
expected = {'direction': 'ingress',
- 'ethertype': 'IPv6',
+ 'ethertype': const.IPv6,
'remote_group_id': security_group_id,
'remote_ip_prefix': None,
'protocol': None,
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix),
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_group_id', remote_group_id),
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
- protocol = 'icmp'
+ protocol = const.PROTO_NAME_ICMP
# port_range_min (ICMP type) is greater than port_range_max
# (ICMP code) in order to confirm min <= max port check is
# not called for ICMP.
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
- protocol = 'icmp'
+ protocol = const.PROTO_NAME_ICMP
# ICMP type
port_range_min = 8
# ICMP code
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_bad_security_group_id(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
remote_ip_prefix)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 404)
+ self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
- 'protocol': 'tcp',
+ 'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': "bad_tenant"}}
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 404)
+ self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant_remote_group_id(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg2['security_group']['id'],
'direction': 'ingress',
- 'protocol': 'tcp',
+ 'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant',
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 404)
+ self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_tenant_security_group_rule(self):
with self.security_group() as sg:
rule = {'security_group_rule':
{'security_group_id': sg['security_group']['id'],
'direction': 'ingress',
- 'protocol': 'tcp',
+ 'protocol': const.PROTO_NAME_TCP,
'port_range_min': '22',
'port_range_max': '22',
'tenant_id': 'bad_tenant'}}
tenant_id='bad_tenant',
set_context=True)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 404)
+ self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_bad_remote_group_id(self):
name = 'webservers'
security_group_id = sg['security_group']['id']
remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
rule = self._build_security_group_rule(security_group_id, direction,
remote_group_id=remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 404)
+ self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
def test_create_security_group_rule_duplicate_rules(self):
name = 'webservers'
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '22', '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
+ self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_min_port_greater_max(self):
name = 'webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
- for protocol in ['tcp', 'udp', 6, 17]:
+ for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP,
+ const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]:
rule = self._build_security_group_rule(
sg['security_group']['id'],
'ingress', protocol, '50', '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int,
+ webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_ports_but_no_protocol(self):
name = 'webservers'
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_port_range_min_only(self):
name = 'webservers'
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'tcp', '22', None)
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '22', None)
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_port_range_max_only(self):
name = 'webservers'
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'tcp', None, '22')
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, None, '22')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_icmp_type_too_big(self):
name = 'webservers'
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'icmp', '256', None)
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_ICMP, '256', None)
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_icmp_code_too_big(self):
name = 'webservers'
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'icmp', '8', '256')
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_ICMP, '8', '256')
self._create_security_group_rule(self.fmt, rule)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_list_ports_security_group(self):
with self.network() as n:
security_groups=['bad_id'])
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_delete_security_group_port_in_use(self):
with self.network() as n:
sg['security_group']['id'])
# try to delete security group that's in use
res = self._delete('security-groups',
- sg['security_group']['id'], 409)
+ sg['security_group']['id'],
+ webob.exc.HTTPConflict.code)
# delete the blocking port
self._delete('ports', port['port']['id'])
"security_group_rule create")
with self.security_group() as sg:
rule1 = self._build_security_group_rule(sg['security_group']['id'],
- 'ingress', 'tcp', '22',
+ 'ingress',
+ const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(sg['security_group']['id'],
- 'ingress', 'tcp', '23',
+ 'ingress',
+ const.PROTO_NAME_TCP, '23',
'23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_bulk_emulated(self):
real_has_attr = hasattr
new=fakehasattr):
with self.security_group() as sg:
rule1 = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
- '10.0.0.1/24')
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
- '10.0.0.1/24')
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk:
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
- 'ingress', 'tcp', '22',
+ 'ingress',
+ const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
+ self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
real_has_attr = hasattr
with self.security_group() as sg:
rule = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
- '10.0.0.1/24')
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule['security_group_rule'],
rule['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
+ self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_db(self):
if self._skip_native_bulk:
"security_group_rule create")
with self.security_group() as sg:
rule = self._build_security_group_rule(sg['security_group']['id'],
- 'ingress', 'tcp', '22',
+ 'ingress',
+ const.PROTO_NAME_TCP, '22',
'22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rules)
rule = self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
+ self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
real_has_attr = hasattr
new=fakehasattr):
with self.security_group() as sg:
rule = self._build_security_group_rule(
- sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
- '10.0.0.1/24')
+ sg['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rules = {'security_group_rules': [rule]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
+ self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_create_security_group_rule_differnt_security_group_ids(self):
if self._skip_native_bulk:
with self.security_group() as sg1:
with self.security_group() as sg2:
rule1 = self._build_security_group_rule(
- sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
- '10.0.0.1/24')
+ sg1['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
rule2 = self._build_security_group_rule(
- sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
- '10.0.0.1/24')
+ sg2['security_group']['id'], 'ingress',
+ const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
rules = {'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]
}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_with_invalid_ethertype(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
ethertype='IPv5')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_with_invalid_protocol(self):
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
remote_group_id)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_port_with_non_uuid(self):
with self.network() as n:
security_groups=['not_valid'])
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
+ self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
class TestSecurityGroupsXML(TestSecurityGroups):
from mock import call
import mox
from oslo.config import cfg
+import webob.exc
from neutron.agent import firewall as firewall_base
from neutron.agent.linux import iptables_manager
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.common import constants as const
from neutron import context
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
self.rpc = FakeSGCallback()
def test_security_group_rules_for_devices_ipv4_ingress(self):
- fake_prefix = test_fw.FAKE_PREFIX['IPv4']
+ fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
with self.network() as n:
with nested(self.subnet(n),
self.security_group()) as (subnet_v4,
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
- 'ingress', 'tcp', '22',
+ 'ingress', const.PROTO_NAME_TCP, '22',
'22')
rule2 = self._build_security_group_rule(
sg1_id,
- 'ingress', 'tcp', '23',
+ 'ingress', const.PROTO_NAME_TCP, '23',
'23', fake_prefix)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'ingress',
- 'protocol': 'tcp', 'ethertype': 'IPv4',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
- {'direction': 'ingress', 'protocol': 'tcp',
- 'ethertype': 'IPv4',
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'source_ip_prefix': fake_prefix},
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv4_egress(self):
- fake_prefix = test_fw.FAKE_PREFIX['IPv4']
+ fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
with self.network() as n:
with nested(self.subnet(n),
self.security_group()) as (subnet_v4,
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
- 'egress', 'tcp', '22',
+ 'egress', const.PROTO_NAME_TCP, '22',
'22')
rule2 = self._build_security_group_rule(
sg1_id,
- 'egress', 'udp', '23',
+ 'egress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'egress',
- 'protocol': 'tcp', 'ethertype': 'IPv4',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
- {'direction': 'egress', 'protocol': 'udp',
- 'ethertype': 'IPv4',
+ {'direction': 'egress',
+ 'protocol': const.PROTO_NAME_UDP,
+ 'ethertype': const.IPv4,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'dest_ip_prefix': fake_prefix},
sg2_id = sg2['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
- 'ingress', 'tcp', '24',
+ 'ingress', const.PROTO_NAME_TCP, '24',
'25', remote_group_id=sg2['security_group']['id'])
rules = {
'security_group_rules': [rule1['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv4',
+ {'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg2_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': u'ingress',
'source_ip_prefix': u'10.0.0.3/32',
- 'protocol': u'tcp', 'ethertype': u'IPv4',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id,
'security_group_id': sg1_id},
self._delete('ports', port_id2)
def test_security_group_rules_for_devices_ipv6_ingress(self):
- fake_prefix = test_fw.FAKE_PREFIX['IPv6']
+ fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
cidr=fake_prefix,
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
- 'ingress', 'tcp', '22',
+ 'ingress', const.PROTO_NAME_TCP, '22',
'22',
- ethertype='IPv6')
+ ethertype=const.IPv6)
rule2 = self._build_security_group_rule(
sg1_id,
- 'ingress', 'udp', '23',
+ 'ingress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix,
- ethertype='IPv6')
+ ethertype=const.IPv6)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'ingress',
- 'protocol': 'tcp', 'ethertype': 'IPv6',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
- {'direction': 'ingress', 'protocol': 'udp',
- 'ethertype': 'IPv6',
+ {'direction': 'ingress',
+ 'protocol': const.PROTO_NAME_UDP,
+ 'ethertype': const.IPv6,
'port_range_max': 23, 'security_group_id': sg1_id,
'port_range_min': 23,
'source_ip_prefix': fake_prefix},
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_egress(self):
- fake_prefix = test_fw.FAKE_PREFIX['IPv6']
+ fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
cidr=fake_prefix,
sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
- 'egress', 'tcp', '22',
+ 'egress', const.PROTO_NAME_TCP, '22',
'22',
- ethertype='IPv6')
+ ethertype=const.IPv6)
rule2 = self._build_security_group_rule(
sg1_id,
- 'egress', 'udp', '23',
+ 'egress', const.PROTO_NAME_UDP, '23',
'23', fake_prefix,
- ethertype='IPv6')
+ ethertype=const.IPv6)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
{'direction': 'egress',
- 'protocol': 'tcp', 'ethertype': 'IPv6',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
'port_range_max': 22,
'security_group_id': sg1_id,
'port_range_min': 22},
- {'direction': 'egress', 'protocol': 'udp',
- 'ethertype': 'IPv6',
- 'port_range_max': 23, 'security_group_id': sg1_id,
+ {'direction': 'egress',
+ 'protocol': const.PROTO_NAME_UDP,
+ 'ethertype': const.IPv6,
+ 'port_range_max': 23,
+ 'security_group_id': sg1_id,
'port_range_min': 23,
'dest_ip_prefix': fake_prefix},
]
self._delete('ports', port_id1)
def test_security_group_rules_for_devices_ipv6_source_group(self):
- fake_prefix = test_fw.FAKE_PREFIX['IPv6']
+ fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
with self.network() as n:
with nested(self.subnet(n,
cidr=fake_prefix,
sg2_id = sg2['security_group']['id']
rule1 = self._build_security_group_rule(
sg1_id,
- 'ingress', 'tcp', '24',
+ 'ingress', const.PROTO_NAME_TCP, '24',
'25',
- ethertype='IPv6',
+ ethertype=const.IPv6,
remote_group_id=sg2['security_group']['id'])
rules = {
'security_group_rules': [rule1['security_group_rule']]}
res = self._create_security_group_rule(self.fmt, rules)
self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
res1 = self._create_port(
self.fmt, n['network']['id'],
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
- expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+ expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg1_id},
- {'direction': 'egress', 'ethertype': 'IPv4',
+ {'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg2_id},
- {'direction': 'egress', 'ethertype': 'IPv6',
+ {'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': 'ingress',
'source_ip_prefix': 'fe80::3/128',
- 'protocol': 'tcp', 'ethertype': 'IPv6',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv6,
'port_range_max': 25, 'port_range_min': 24,
'remote_group_id': sg2_id,
'security_group_id': sg1_id},
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
rule1 = [{'direction': 'ingress',
- 'protocol': 'udp',
- 'ethertype': 'IPv4',
+ 'protocol': const.PROTO_NAME_UDP,
+ 'ethertype': const.IPv4,
'source_ip_prefix': '10.0.0.2',
'source_port_range_min': 67,
'source_port_range_max': 67,
'port_range_min': 68,
'port_range_max': 68},
{'direction': 'ingress',
- 'protocol': 'tcp',
- 'ethertype': 'IPv4',
+ 'protocol': const.PROTO_NAME_TCP,
+ 'ethertype': const.IPv4,
'port_range_min': 22,
'port_range_max': 22},
{'direction': 'egress',
- 'ethertype': 'IPv4'}]
+ 'ethertype': const.IPv4}]
rule2 = rule1[:]
rule2 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.4',
- 'ethertype': 'IPv4'}]
+ 'ethertype': const.IPv4}]
rule3 = rule2[:]
rule3 += [{'direction': 'ingress',
- 'protocol': 'icmp',
- 'ethertype': 'IPv4'}]
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv4}]
rule4 = rule1[:]
rule4 += [{'direction': 'ingress',
'source_ip_prefix': '10.0.0.3',
- 'ethertype': 'IPv4'}]
+ 'ethertype': const.IPv4}]
rule5 = rule4[:]
rule5 += [{'direction': 'ingress',
- 'protocol': 'icmp',
- 'ethertype': 'IPv4'}]
+ 'protocol': const.PROTO_NAME_ICMP,
+ 'ethertype': const.IPv4}]
self.devices1 = {'tap_port1': self._device('tap_port1',
'10.0.0.3',
'12:34:56:78:9a:bc',
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
- protocol = 'tcp'
+ protocol = const.PROTO_NAME_TCP
port_range_min = 88
port_range_max = 88
with self.security_group_rule(security_group_id, direction,