]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Change hard coded numbers to constants in security group tests
authorArvind Somya <asomya@cisco.com>
Wed, 4 Sep 2013 16:20:14 +0000 (12:20 -0400)
committerArvind Somya <asomya@cisco.com>
Mon, 23 Sep 2013 14:21:24 +0000 (10:21 -0400)
Security groups tests in Neutron contain a lot of hard coded HTTP return codes
and protocol numbers. These should be changed to use constants.

Change-Id: Ibecff3821c54f12848a05648f35381a0c73a0896
Fixes: Bug 1218928
neutron/common/constants.py
neutron/db/securitygroups_db.py
neutron/extensions/securitygroup.py
neutron/plugins/midonet/common/net_util.py
neutron/plugins/nicira/nvplib.py
neutron/tests/unit/test_extension_security_group.py
neutron/tests/unit/test_security_groups_rpc.py

index 016fa94c6f3ee3f5a7c2a60daae0af77d4094f01..dfa8f1fd927b0a8f952fc5c1007ef1de83ef78d1 100644 (file)
@@ -35,11 +35,6 @@ METERING_LABEL_KEY = '_metering_labels'
 IPv4 = 'IPv4'
 IPv6 = 'IPv6'
 
-ICMP_PROTOCOL = 1
-TCP_PROTOCOL = 6
-UDP_PROTOCOL = 17
-ICMPv6_PROTOCOL = 58
-
 DHCP_RESPONSE_PORT = 68
 
 MIN_VLAN_TAG = 1
@@ -87,3 +82,13 @@ PORT_BINDING_EXT_ALIAS = 'binding'
 L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
 DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
 LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
+
+# Protocol names and numbers for Security Groups/Firewalls
+PROTO_NAME_TCP = 'tcp'
+PROTO_NAME_ICMP = 'icmp'
+PROTO_NAME_ICMP_V6 = 'icmpv6'
+PROTO_NAME_UDP = 'udp'
+PROTO_NUM_TCP = 6
+PROTO_NUM_ICMP = 1
+PROTO_NUM_ICMP_V6 = 58
+PROTO_NUM_UDP = 17
index 89b66d2d3a6e0e16c15c75c4c358912f50ed465a..5986190c5d466dda91806850b1cd3698d6bd5208 100644 (file)
@@ -30,10 +30,10 @@ from neutron.extensions import securitygroup as ext_sg
 from neutron.openstack.common import uuidutils
 
 
-IP_PROTOCOL_MAP = {'tcp': constants.TCP_PROTOCOL,
-                   'udp': constants.UDP_PROTOCOL,
-                   'icmp': constants.ICMP_PROTOCOL,
-                   'icmpv6': constants.ICMPv6_PROTOCOL}
+IP_PROTOCOL_MAP = {constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
+                   constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
+                   constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP,
+                   constants.PROTO_NAME_ICMP_V6: constants.PROTO_NUM_ICMP_V6}
 
 
 class SecurityGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@@ -304,13 +304,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
         if not rule['protocol']:
             raise ext_sg.SecurityGroupProtocolRequiredWithPorts()
         ip_proto = self._get_ip_proto_number(rule['protocol'])
-        if ip_proto in [constants.TCP_PROTOCOL, constants.UDP_PROTOCOL]:
+        if ip_proto in [constants.PROTO_NUM_TCP, constants.PROTO_NUM_UDP]:
             if (rule['port_range_min'] is not None and
                 rule['port_range_min'] <= rule['port_range_max']):
                 pass
             else:
                 raise ext_sg.SecurityGroupInvalidPortRange()
-        elif ip_proto == constants.ICMP_PROTOCOL:
+        elif ip_proto == constants.PROTO_NUM_ICMP:
             for attr, field in [('port_range_min', 'type'),
                                 ('port_range_max', 'code')]:
                 if rule[attr] > 255:
index ebc1f780bc4a558aa96aef4f3a2e3fcffdf5323e..85d499ad59f77dbd811532a31872f06840d716b8 100644 (file)
@@ -23,6 +23,7 @@ from oslo.config import cfg
 from neutron.api import extensions
 from neutron.api.v2 import attributes as attr
 from neutron.api.v2 import base
+from neutron.common import constants as const
 from neutron.common import exceptions as qexception
 from neutron import manager
 from neutron.openstack.common import uuidutils
@@ -158,7 +159,8 @@ def _validate_name_not_default(data, valid_values=None):
 
 attr.validators['type:name_not_default'] = _validate_name_not_default
 
-sg_supported_protocols = [None, 'tcp', 'udp', 'icmp']
+sg_supported_protocols = [None, const.PROTO_NAME_TCP,
+                          const.PROTO_NAME_UDP, const.PROTO_NAME_ICMP]
 sg_supported_ethertypes = ['IPv4', 'IPv6']
 
 # Attribute Map
index 26479711c37b7a05a9845c854ba9a8359679907e..884048675f49ea3c901374655bc53af1b4b7b39a 100644 (file)
@@ -61,8 +61,8 @@ def get_protocol_value(protocol):
         return protocol
 
     mapping = {
-        'tcp': constants.TCP_PROTOCOL,
-        'udp': constants.UDP_PROTOCOL,
-        'icmp': constants.ICMP_PROTOCOL
+        constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
+        constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
+        constants.PROTO_NAME_ICMP: constants.PROTO_NUM_ICMP
     }
     return mapping.get(protocol.lower())
index 0842a24445193cdebe82d2e41c96c2ea9abd7ea1..37802f1d5b8a8b1108bf33eb3e66ca94300c3969 100644 (file)
@@ -1073,7 +1073,7 @@ def create_security_profile(cluster, tenant_id, security_profile):
     # Allow all dhcp responses and all ingress traffic
     hidden_rules = {'logical_port_egress_rules':
                     [{'ethertype': 'IPv4',
-                      'protocol': constants.UDP_PROTOCOL,
+                      'protocol': constants.PROTO_NUM_UDP,
                       'port_range_min': constants.DHCP_RESPONSE_PORT,
                       'port_range_max': constants.DHCP_RESPONSE_PORT,
                       'ip_prefix': '0.0.0.0/0'}],
@@ -1111,7 +1111,7 @@ def update_security_group_rules(cluster, spid, rules):
 
     # Allow all dhcp responses in
     rules['logical_port_egress_rules'].append(
-        {'ethertype': 'IPv4', 'protocol': constants.UDP_PROTOCOL,
+        {'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
          'port_range_min': constants.DHCP_RESPONSE_PORT,
          'port_range_max': constants.DHCP_RESPONSE_PORT,
          'ip_prefix': '0.0.0.0/0'})
index 17c87335122bdd42864d04d683d7ec3e2a334c15..d0f6ae0bff6febaccafafba7ce8db203245d7c9d 100644 (file)
@@ -20,6 +20,7 @@ import mock
 import webob.exc
 
 from neutron.api.v2 import attributes as attr
+from neutron.common import constants as const
 from neutron.common.test_lib import test_config
 from neutron import context
 from neutron.db import db_base_plugin_v2
@@ -75,7 +76,7 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
                                    port_range_min=None, port_range_max=None,
                                    remote_ip_prefix=None, remote_group_id=None,
                                    tenant_id='test_tenant',
-                                   ethertype='IPv4'):
+                                   ethertype=const.IPv4):
 
         data = {'security_group_rule': {'security_group_id': security_group_id,
                                         'direction': direction,
@@ -110,13 +111,13 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
 
     def _make_security_group(self, fmt, name, description, **kwargs):
         res = self._create_security_group(fmt, name, description, **kwargs)
-        if res.status_int >= 400:
+        if res.status_int >= webob.exc.HTTPBadRequest.code:
             raise webob.exc.HTTPClientError(code=res.status_int)
         return self.deserialize(fmt, res)
 
     def _make_security_group_rule(self, fmt, rules, **kwargs):
         res = self._create_security_group_rule(self.fmt, rules)
-        if res.status_int >= 400:
+        if res.status_int >= webob.exc.HTTPBadRequest.code:
             raise webob.exc.HTTPClientError(code=res.status_int)
         return self.deserialize(fmt, res)
 
@@ -136,10 +137,10 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
     @contextlib.contextmanager
     def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
                                                     'd1db38eb087',
-                            direction='ingress', protocol='tcp',
+                            direction='ingress', protocol=const.PROTO_NAME_TCP,
                             port_range_min='22', port_range_max='22',
                             remote_ip_prefix=None, remote_group_id=None,
-                            fmt=None, no_delete=False, ethertype='IPv4'):
+                            fmt=None, no_delete=False, ethertype=const.IPv4):
         if not fmt:
             fmt = self.fmt
         rule = self._build_security_group_rule(security_group_id,
@@ -263,11 +264,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
         sg_rules = security_group['security_group']['security_group_rules']
         self.assertEqual(len(sg_rules), 2)
 
-        v4_rules = filter(lambda x: x['ethertype'] == 'IPv4', sg_rules)
+        v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]
         self.assertEqual(len(v4_rules), 1)
         v4_rule = v4_rules[0]
         expected = {'direction': 'egress',
-                    'ethertype': 'IPv4',
+                    'ethertype': const.IPv4,
                     'remote_group_id': None,
                     'remote_ip_prefix': None,
                     'protocol': None,
@@ -275,11 +276,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                     'port_range_min': None}
         self._assert_sg_rule_has_kvs(v4_rule, expected)
 
-        v6_rules = filter(lambda x: x['ethertype'] == 'IPv6', sg_rules)
+        v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]
         self.assertEqual(len(v6_rules), 1)
         v6_rule = v6_rules[0]
         expected = {'direction': 'egress',
-                    'ethertype': 'IPv6',
+                    'ethertype': const.IPv6,
                     'remote_group_id': None,
                     'remote_ip_prefix': None,
                     'protocol': None,
@@ -309,7 +310,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                           sg['security_group']['id'])
             req.environ['neutron.context'] = context.Context('', 'somebody')
             res = req.get_response(self.ext_api)
-            self.assertEqual(res.status_int, 409)
+            self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
 
     def test_update_default_security_group_name_fail(self):
         with self.network():
@@ -322,7 +323,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                           sg['security_groups'][0]['id'])
             req.environ['neutron.context'] = context.Context('', 'somebody')
             res = req.get_response(self.ext_api)
-            self.assertEqual(res.status_int, 404)
+            self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
 
     def test_update_default_security_group_with_description(self):
         with self.network():
@@ -347,7 +348,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
         description = 'my webservers'
         res = self._create_security_group(self.fmt, name, description)
         self.deserialize(self.fmt, res)
-        self.assertEqual(res.status_int, 409)
+        self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
 
     def test_list_security_groups(self):
         with contextlib.nested(self.security_group(name='sg1',
@@ -406,23 +407,23 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             ethertype = 2
             rule = self._build_security_group_rule(
-                security_group_id, 'ingress', 'tcp', '22', '22', None, None,
-                ethertype=ethertype)
+                security_group_id, 'ingress', const.PROTO_NAME_TCP, '22',
+                '22', None, None, ethertype=ethertype)
             res = self._create_security_group_rule(self.fmt, rule)
             self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 400)
+            self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_tcp_protocol_as_number(self):
         name = 'webservers'
         description = 'my webservers'
         with self.security_group(name, description) as sg:
             security_group_id = sg['security_group']['id']
-            protocol = 6  # TCP
+            protocol = const.PROTO_NUM_TCP  # TCP
             rule = self._build_security_group_rule(
                 security_group_id, 'ingress', protocol, '22', '22')
             res = self._create_security_group_rule(self.fmt, rule)
             self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 201)
+            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
     def test_create_security_group_rule_protocol_as_number(self):
         name = 'webservers'
@@ -434,7 +435,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                 security_group_id, 'ingress', protocol)
             res = self._create_security_group_rule(self.fmt, rule)
             self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 201)
+            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
     def test_create_security_group_rule_case_insensitive(self):
         name = 'webservers'
@@ -457,7 +458,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                 self.assertEqual(rule['security_group_rule']['protocol'],
                                  protocol.lower())
                 self.assertEqual(rule['security_group_rule']['ethertype'],
-                                 'IPv4')
+                                 const.IPv4)
 
     def test_get_security_group(self):
         name = 'webservers'
@@ -468,7 +469,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             direction = "ingress"
             remote_ip_prefix = "10.0.0.0/24"
-            protocol = 'tcp'
+            protocol = const.PROTO_NAME_TCP
             port_range_min = 22
             port_range_max = 22
             keys = [('remote_ip_prefix', remote_ip_prefix),
@@ -488,8 +489,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                 self.assertEqual(group['security_group']['id'],
                                  remote_group_id)
                 self.assertEqual(len(sg_rule), 3)
-                sg_rule = filter(lambda x: x['direction'] == 'ingress',
-                                 sg_rule)
+                sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
                 for k, v, in keys:
                     self.assertEqual(sg_rule[0][k], v)
 
@@ -498,14 +498,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
         description = 'my webservers'
         with self.security_group(name, description, no_delete=True) as sg:
             remote_group_id = sg['security_group']['id']
-            self._delete('security-groups', remote_group_id, 204)
+            self._delete('security-groups', remote_group_id,
+                         webob.exc.HTTPNoContent.code)
 
     def test_delete_default_security_group_admin(self):
         with self.network():
             res = self.new_list_request('security-groups')
             sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
             self._delete('security-groups', sg['security_groups'][0]['id'],
-                         204)
+                         webob.exc.HTTPNoContent.code)
 
     def test_delete_default_security_group_nonadmin(self):
         with self.network():
@@ -513,7 +514,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             sg = self.deserialize(self.fmt, res.get_response(self.ext_api))
             neutron_context = context.Context('', 'test-tenant')
             self._delete('security-groups', sg['security_groups'][0]['id'],
-                         409, neutron_context=neutron_context)
+                         webob.exc.HTTPConflict.code,
+                         neutron_context=neutron_context)
 
     def test_security_group_list_creates_default_security_group(self):
         neutron_context = context.Context('', 'test-tenant')
@@ -533,15 +535,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
 
             # Verify default rule for v4 egress
             sg_rules = rules['security_group_rules']
-            rules = filter(
-                lambda x: (
-                    x['direction'] == 'egress' and x['ethertype'] == 'IPv4'),
-                sg_rules)
+            rules = [
+                r for r in sg_rules
+                if r['direction'] == 'egress' and r['ethertype'] == const.IPv4
+            ]
             self.assertEqual(len(rules), 1)
             v4_egress = rules[0]
 
             expected = {'direction': 'egress',
-                        'ethertype': 'IPv4',
+                        'ethertype': const.IPv4,
                         'remote_group_id': None,
                         'remote_ip_prefix': None,
                         'protocol': None,
@@ -550,15 +552,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             self._assert_sg_rule_has_kvs(v4_egress, expected)
 
             # Verify default rule for v6 egress
-            rules = filter(
-                lambda x: (
-                    x['direction'] == 'egress' and x['ethertype'] == 'IPv6'),
-                sg_rules)
+            rules = [
+                r for r in sg_rules
+                if r['direction'] == 'egress' and r['ethertype'] == const.IPv6
+            ]
             self.assertEqual(len(rules), 1)
             v6_egress = rules[0]
 
             expected = {'direction': 'egress',
-                        'ethertype': 'IPv6',
+                        'ethertype': const.IPv6,
                         'remote_group_id': None,
                         'remote_ip_prefix': None,
                         'protocol': None,
@@ -567,15 +569,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             self._assert_sg_rule_has_kvs(v6_egress, expected)
 
             # Verify default rule for v4 ingress
-            rules = filter(
-                lambda x: (
-                    x['direction'] == 'ingress' and x['ethertype'] == 'IPv4'),
-                sg_rules)
+            rules = [
+                r for r in sg_rules
+                if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4
+            ]
             self.assertEqual(len(rules), 1)
             v4_ingress = rules[0]
 
             expected = {'direction': 'ingress',
-                        'ethertype': 'IPv4',
+                        'ethertype': const.IPv4,
                         'remote_group_id': security_group_id,
                         'remote_ip_prefix': None,
                         'protocol': None,
@@ -584,15 +586,15 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             self._assert_sg_rule_has_kvs(v4_ingress, expected)
 
             # Verify default rule for v6 ingress
-            rules = filter(
-                lambda x: (
-                    x['direction'] == 'ingress' and x['ethertype'] == 'IPv6'),
-                sg_rules)
+            rules = [
+                r for r in sg_rules
+                if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6
+            ]
             self.assertEqual(len(rules), 1)
             v6_ingress = rules[0]
 
             expected = {'direction': 'ingress',
-                        'ethertype': 'IPv6',
+                        'ethertype': const.IPv6,
                         'remote_group_id': security_group_id,
                         'remote_ip_prefix': None,
                         'protocol': None,
@@ -607,7 +609,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             direction = "ingress"
             remote_ip_prefix = "10.0.0.0/24"
-            protocol = 'tcp'
+            protocol = const.PROTO_NAME_TCP
             port_range_min = 22
             port_range_max = 22
             keys = [('remote_ip_prefix', remote_ip_prefix),
@@ -631,7 +633,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                 security_group_id = sg['security_group']['id']
                 direction = "ingress"
                 remote_group_id = sg2['security_group']['id']
-                protocol = 'tcp'
+                protocol = const.PROTO_NAME_TCP
                 port_range_min = 22
                 port_range_max = 22
                 keys = [('remote_group_id', remote_group_id),
@@ -655,7 +657,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             direction = "ingress"
             remote_ip_prefix = "10.0.0.0/24"
-            protocol = 'icmp'
+            protocol = const.PROTO_NAME_ICMP
             # port_range_min (ICMP type) is greater than port_range_max
             # (ICMP code) in order to confirm min <= max port check is
             # not called for ICMP.
@@ -681,7 +683,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             direction = "ingress"
             remote_ip_prefix = "10.0.0.0/24"
-            protocol = 'icmp'
+            protocol = const.PROTO_NAME_ICMP
             # ICMP type
             port_range_min = 8
             # ICMP code
@@ -703,7 +705,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
         security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
         direction = "ingress"
         remote_ip_prefix = "10.0.0.0/24"
-        protocol = 'tcp'
+        protocol = const.PROTO_NAME_TCP
         port_range_min = 22
         port_range_max = 22
         remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
@@ -714,13 +716,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                                remote_group_id)
         res = self._create_security_group_rule(self.fmt, rule)
         self.deserialize(self.fmt, res)
-        self.assertEqual(res.status_int, 400)
+        self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_bad_security_group_id(self):
         security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
         direction = "ingress"
         remote_ip_prefix = "10.0.0.0/24"
-        protocol = 'tcp'
+        protocol = const.PROTO_NAME_TCP
         port_range_min = 22
         port_range_max = 22
         rule = self._build_security_group_rule(security_group_id, direction,
@@ -729,21 +731,21 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                                remote_ip_prefix)
         res = self._create_security_group_rule(self.fmt, rule)
         self.deserialize(self.fmt, res)
-        self.assertEqual(res.status_int, 404)
+        self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
 
     def test_create_security_group_rule_bad_tenant(self):
         with self.security_group() as sg:
             rule = {'security_group_rule':
                     {'security_group_id': sg['security_group']['id'],
                      'direction': 'ingress',
-                     'protocol': 'tcp',
+                     'protocol': const.PROTO_NAME_TCP,
                      'port_range_min': '22',
                      'port_range_max': '22',
                      'tenant_id': "bad_tenant"}}
 
         res = self._create_security_group_rule(self.fmt, rule)
         self.deserialize(self.fmt, res)
-        self.assertEqual(res.status_int, 404)
+        self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
 
     def test_create_security_group_rule_bad_tenant_remote_group_id(self):
         with self.security_group() as sg:
@@ -754,7 +756,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             rule = {'security_group_rule':
                     {'security_group_id': sg2['security_group']['id'],
                      'direction': 'ingress',
-                     'protocol': 'tcp',
+                     'protocol': const.PROTO_NAME_TCP,
                      'port_range_min': '22',
                      'port_range_max': '22',
                      'tenant_id': 'bad_tenant',
@@ -764,7 +766,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                                    tenant_id='bad_tenant',
                                                    set_context=True)
             self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 404)
+            self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
 
     def test_create_security_group_rule_bad_tenant_security_group_rule(self):
         with self.security_group() as sg:
@@ -775,7 +777,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             rule = {'security_group_rule':
                     {'security_group_id': sg['security_group']['id'],
                      'direction': 'ingress',
-                     'protocol': 'tcp',
+                     'protocol': const.PROTO_NAME_TCP,
                      'port_range_min': '22',
                      'port_range_max': '22',
                      'tenant_id': 'bad_tenant'}}
@@ -784,7 +786,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                                    tenant_id='bad_tenant',
                                                    set_context=True)
             self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 404)
+            self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
 
     def test_create_security_group_rule_bad_remote_group_id(self):
         name = 'webservers'
@@ -793,7 +795,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
             direction = "ingress"
-            protocol = 'tcp'
+            protocol = const.PROTO_NAME_TCP
             port_range_min = 22
             port_range_max = 22
         rule = self._build_security_group_rule(security_group_id, direction,
@@ -802,7 +804,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                                remote_group_id=remote_group_id)
         res = self._create_security_group_rule(self.fmt, rule)
         self.deserialize(self.fmt, res)
-        self.assertEqual(res.status_int, 404)
+        self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
 
     def test_create_security_group_rule_duplicate_rules(self):
         name = 'webservers'
@@ -811,11 +813,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             with self.security_group_rule(security_group_id):
                 rule = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '22', '22')
                 self._create_security_group_rule(self.fmt, rule)
                 res = self._create_security_group_rule(self.fmt, rule)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 409)
+                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
 
     def test_create_security_group_rule_min_port_greater_max(self):
         name = 'webservers'
@@ -823,14 +826,16 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
         with self.security_group(name, description) as sg:
             security_group_id = sg['security_group']['id']
             with self.security_group_rule(security_group_id):
-                for protocol in ['tcp', 'udp', 6, 17]:
+                for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP,
+                                 const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]:
                     rule = self._build_security_group_rule(
                         sg['security_group']['id'],
                         'ingress', protocol, '50', '22')
                     self._create_security_group_rule(self.fmt, rule)
                     res = self._create_security_group_rule(self.fmt, rule)
                     self.deserialize(self.fmt, res)
-                    self.assertEqual(res.status_int, 400)
+                    self.assertEqual(res.status_int,
+                                     webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_ports_but_no_protocol(self):
         name = 'webservers'
@@ -843,7 +848,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                 self._create_security_group_rule(self.fmt, rule)
                 res = self._create_security_group_rule(self.fmt, rule)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_port_range_min_only(self):
         name = 'webservers'
@@ -852,11 +857,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             with self.security_group_rule(security_group_id):
                 rule = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'tcp', '22', None)
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '22', None)
                 self._create_security_group_rule(self.fmt, rule)
                 res = self._create_security_group_rule(self.fmt, rule)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_port_range_max_only(self):
         name = 'webservers'
@@ -865,11 +871,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             with self.security_group_rule(security_group_id):
                 rule = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'tcp', None, '22')
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, None, '22')
                 self._create_security_group_rule(self.fmt, rule)
                 res = self._create_security_group_rule(self.fmt, rule)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_icmp_type_too_big(self):
         name = 'webservers'
@@ -878,11 +885,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             with self.security_group_rule(security_group_id):
                 rule = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'icmp', '256', None)
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_ICMP, '256', None)
                 self._create_security_group_rule(self.fmt, rule)
                 res = self._create_security_group_rule(self.fmt, rule)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_icmp_code_too_big(self):
         name = 'webservers'
@@ -891,11 +899,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             security_group_id = sg['security_group']['id']
             with self.security_group_rule(security_group_id):
                 rule = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'icmp', '8', '256')
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_ICMP, '8', '256')
                 self._create_security_group_rule(self.fmt, rule)
                 res = self._create_security_group_rule(self.fmt, rule)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_list_ports_security_group(self):
         with self.network() as n:
@@ -1107,7 +1116,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                         security_groups=['bad_id'])
 
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_delete_security_group_port_in_use(self):
         with self.network() as n:
@@ -1121,7 +1130,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                      sg['security_group']['id'])
                     # try to delete security group that's in use
                     res = self._delete('security-groups',
-                                       sg['security_group']['id'], 409)
+                                       sg['security_group']['id'],
+                                       webob.exc.HTTPConflict.code)
                     # delete the blocking port
                     self._delete('ports', port['port']['id'])
 
@@ -1131,16 +1141,18 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                           "security_group_rule create")
         with self.security_group() as sg:
             rule1 = self._build_security_group_rule(sg['security_group']['id'],
-                                                    'ingress', 'tcp', '22',
+                                                    'ingress',
+                                                    const.PROTO_NAME_TCP, '22',
                                                     '22', '10.0.0.1/24')
             rule2 = self._build_security_group_rule(sg['security_group']['id'],
-                                                    'ingress', 'tcp', '23',
+                                                    'ingress',
+                                                    const.PROTO_NAME_TCP, '23',
                                                     '23', '10.0.0.1/24')
             rules = {'security_group_rules': [rule1['security_group_rule'],
                                               rule2['security_group_rule']]}
             res = self._create_security_group_rule(self.fmt, rules)
             self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 201)
+            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
     def test_create_security_group_rule_bulk_emulated(self):
         real_has_attr = hasattr
@@ -1155,17 +1167,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                         new=fakehasattr):
             with self.security_group() as sg:
                 rule1 = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
-                    '10.0.0.1/24')
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
                 rule2 = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
-                    '10.0.0.1/24')
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
                 rules = {'security_group_rules': [rule1['security_group_rule'],
                                                   rule2['security_group_rule']]
                          }
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 201)
+                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
     def test_create_security_group_rule_duplicate_rule_in_post(self):
         if self._skip_native_bulk:
@@ -1173,13 +1185,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                           "security_group_rule create")
         with self.security_group() as sg:
             rule = self._build_security_group_rule(sg['security_group']['id'],
-                                                   'ingress', 'tcp', '22',
+                                                   'ingress',
+                                                   const.PROTO_NAME_TCP, '22',
                                                    '22', '10.0.0.1/24')
             rules = {'security_group_rules': [rule['security_group_rule'],
                                               rule['security_group_rule']]}
             res = self._create_security_group_rule(self.fmt, rules)
             rule = self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 409)
+            self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
 
     def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
         real_has_attr = hasattr
@@ -1195,13 +1208,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
 
             with self.security_group() as sg:
                 rule = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
-                    '10.0.0.1/24')
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
                 rules = {'security_group_rules': [rule['security_group_rule'],
                                                   rule['security_group_rule']]}
                 res = self._create_security_group_rule(self.fmt, rules)
                 rule = self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 409)
+                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
 
     def test_create_security_group_rule_duplicate_rule_db(self):
         if self._skip_native_bulk:
@@ -1209,13 +1222,14 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                           "security_group_rule create")
         with self.security_group() as sg:
             rule = self._build_security_group_rule(sg['security_group']['id'],
-                                                   'ingress', 'tcp', '22',
+                                                   'ingress',
+                                                   const.PROTO_NAME_TCP, '22',
                                                    '22', '10.0.0.1/24')
             rules = {'security_group_rules': [rule]}
             self._create_security_group_rule(self.fmt, rules)
             res = self._create_security_group_rule(self.fmt, rules)
             rule = self.deserialize(self.fmt, res)
-            self.assertEqual(res.status_int, 409)
+            self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
 
     def test_create_security_group_rule_duplicate_rule_db_emulated(self):
         real_has_attr = hasattr
@@ -1230,13 +1244,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                         new=fakehasattr):
             with self.security_group() as sg:
                 rule = self._build_security_group_rule(
-                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
-                    '10.0.0.1/24')
+                    sg['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
                 rules = {'security_group_rules': [rule]}
                 self._create_security_group_rule(self.fmt, rules)
                 res = self._create_security_group_rule(self.fmt, rule)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 409)
+                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
 
     def test_create_security_group_rule_differnt_security_group_ids(self):
         if self._skip_native_bulk:
@@ -1245,24 +1259,24 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
         with self.security_group() as sg1:
             with self.security_group() as sg2:
                 rule1 = self._build_security_group_rule(
-                    sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
-                    '10.0.0.1/24')
+                    sg1['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')
                 rule2 = self._build_security_group_rule(
-                    sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
-                    '10.0.0.1/24')
+                    sg2['security_group']['id'], 'ingress',
+                    const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')
 
                 rules = {'security_group_rules': [rule1['security_group_rule'],
                                                   rule2['security_group_rule']]
                          }
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_with_invalid_ethertype(self):
         security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
         direction = "ingress"
         remote_ip_prefix = "10.0.0.0/24"
-        protocol = 'tcp'
+        protocol = const.PROTO_NAME_TCP
         port_range_min = 22
         port_range_max = 22
         remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
@@ -1274,7 +1288,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                                ethertype='IPv5')
         res = self._create_security_group_rule(self.fmt, rule)
         self.deserialize(self.fmt, res)
-        self.assertEqual(res.status_int, 400)
+        self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_security_group_rule_with_invalid_protocol(self):
         security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
@@ -1291,7 +1305,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                                remote_group_id)
         res = self._create_security_group_rule(self.fmt, rule)
         self.deserialize(self.fmt, res)
-        self.assertEqual(res.status_int, 400)
+        self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
     def test_create_port_with_non_uuid(self):
         with self.network() as n:
@@ -1300,7 +1314,7 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                                         security_groups=['not_valid'])
 
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 400)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
 
 class TestSecurityGroupsXML(TestSecurityGroups):
index 02f50777a827e88813e240cf4af3cfe96643e18b..ca98a55be911d04f95e01209313364b038e59771 100644 (file)
@@ -21,11 +21,13 @@ import mock
 from mock import call
 import mox
 from oslo.config import cfg
+import webob.exc
 
 from neutron.agent import firewall as firewall_base
 from neutron.agent.linux import iptables_manager
 from neutron.agent import rpc as agent_rpc
 from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.common import constants as const
 from neutron import context
 from neutron.db import securitygroups_rpc_base as sg_db_rpc
 from neutron.extensions import allowedaddresspairs as addr_pair
@@ -54,7 +56,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
         self.rpc = FakeSGCallback()
 
     def test_security_group_rules_for_devices_ipv4_ingress(self):
-        fake_prefix = test_fw.FAKE_PREFIX['IPv4']
+        fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
         with self.network() as n:
             with nested(self.subnet(n),
                         self.security_group()) as (subnet_v4,
@@ -62,18 +64,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 sg1_id = sg1['security_group']['id']
                 rule1 = self._build_security_group_rule(
                     sg1_id,
-                    'ingress', 'tcp', '22',
+                    'ingress', const.PROTO_NAME_TCP, '22',
                     '22')
                 rule2 = self._build_security_group_rule(
                     sg1_id,
-                    'ingress', 'tcp', '23',
+                    'ingress', const.PROTO_NAME_TCP, '23',
                     '23', fake_prefix)
                 rules = {
                     'security_group_rules': [rule1['security_group_rule'],
                                              rule2['security_group_rule']]}
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 201)
+                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
                 res1 = self._create_port(
                     self.fmt, n['network']['id'],
@@ -86,17 +88,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 ports_rpc = self.rpc.security_group_rules_for_devices(
                     ctx, devices=devices)
                 port_rpc = ports_rpc[port_id1]
-                expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+                expected = [{'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg1_id},
                             {'direction': 'ingress',
-                             'protocol': 'tcp', 'ethertype': 'IPv4',
+                             'protocol': const.PROTO_NAME_TCP,
+                             'ethertype': const.IPv4,
                              'port_range_max': 22,
                              'security_group_id': sg1_id,
                              'port_range_min': 22},
-                            {'direction': 'ingress', 'protocol': 'tcp',
-                             'ethertype': 'IPv4',
+                            {'direction': 'ingress',
+                             'protocol': const.PROTO_NAME_TCP,
+                             'ethertype': const.IPv4,
                              'port_range_max': 23, 'security_group_id': sg1_id,
                              'port_range_min': 23,
                              'source_ip_prefix': fake_prefix},
@@ -169,7 +173,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 self._delete('ports', port_id1)
 
     def test_security_group_rules_for_devices_ipv4_egress(self):
-        fake_prefix = test_fw.FAKE_PREFIX['IPv4']
+        fake_prefix = test_fw.FAKE_PREFIX[const.IPv4]
         with self.network() as n:
             with nested(self.subnet(n),
                         self.security_group()) as (subnet_v4,
@@ -177,18 +181,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 sg1_id = sg1['security_group']['id']
                 rule1 = self._build_security_group_rule(
                     sg1_id,
-                    'egress', 'tcp', '22',
+                    'egress', const.PROTO_NAME_TCP, '22',
                     '22')
                 rule2 = self._build_security_group_rule(
                     sg1_id,
-                    'egress', 'udp', '23',
+                    'egress', const.PROTO_NAME_UDP, '23',
                     '23', fake_prefix)
                 rules = {
                     'security_group_rules': [rule1['security_group_rule'],
                                              rule2['security_group_rule']]}
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 201)
+                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
                 res1 = self._create_port(
                     self.fmt, n['network']['id'],
@@ -201,17 +205,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 ports_rpc = self.rpc.security_group_rules_for_devices(
                     ctx, devices=devices)
                 port_rpc = ports_rpc[port_id1]
-                expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+                expected = [{'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg1_id},
                             {'direction': 'egress',
-                             'protocol': 'tcp', 'ethertype': 'IPv4',
+                             'protocol': const.PROTO_NAME_TCP,
+                             'ethertype': const.IPv4,
                              'port_range_max': 22,
                              'security_group_id': sg1_id,
                              'port_range_min': 22},
-                            {'direction': 'egress', 'protocol': 'udp',
-                             'ethertype': 'IPv4',
+                            {'direction': 'egress',
+                             'protocol': const.PROTO_NAME_UDP,
+                             'ethertype': const.IPv4,
                              'port_range_max': 23, 'security_group_id': sg1_id,
                              'port_range_min': 23,
                              'dest_ip_prefix': fake_prefix},
@@ -232,13 +238,13 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 sg2_id = sg2['security_group']['id']
                 rule1 = self._build_security_group_rule(
                     sg1_id,
-                    'ingress', 'tcp', '24',
+                    'ingress', const.PROTO_NAME_TCP, '24',
                     '25', remote_group_id=sg2['security_group']['id'])
                 rules = {
                     'security_group_rules': [rule1['security_group_rule']]}
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 201)
+                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
                 res1 = self._create_port(
                     self.fmt, n['network']['id'],
@@ -258,17 +264,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 ports_rpc = self.rpc.security_group_rules_for_devices(
                     ctx, devices=devices)
                 port_rpc = ports_rpc[port_id1]
-                expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+                expected = [{'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv4',
+                            {'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg2_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg2_id},
                             {'direction': u'ingress',
                              'source_ip_prefix': u'10.0.0.3/32',
-                             'protocol': u'tcp', 'ethertype': u'IPv4',
+                             'protocol': const.PROTO_NAME_TCP,
+                             'ethertype': const.IPv4,
                              'port_range_max': 25, 'port_range_min': 24,
                              'remote_group_id': sg2_id,
                              'security_group_id': sg1_id},
@@ -279,7 +286,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 self._delete('ports', port_id2)
 
     def test_security_group_rules_for_devices_ipv6_ingress(self):
-        fake_prefix = test_fw.FAKE_PREFIX['IPv6']
+        fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
         with self.network() as n:
             with nested(self.subnet(n,
                                     cidr=fake_prefix,
@@ -289,20 +296,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 sg1_id = sg1['security_group']['id']
                 rule1 = self._build_security_group_rule(
                     sg1_id,
-                    'ingress', 'tcp', '22',
+                    'ingress', const.PROTO_NAME_TCP, '22',
                     '22',
-                    ethertype='IPv6')
+                    ethertype=const.IPv6)
                 rule2 = self._build_security_group_rule(
                     sg1_id,
-                    'ingress', 'udp', '23',
+                    'ingress', const.PROTO_NAME_UDP, '23',
                     '23', fake_prefix,
-                    ethertype='IPv6')
+                    ethertype=const.IPv6)
                 rules = {
                     'security_group_rules': [rule1['security_group_rule'],
                                              rule2['security_group_rule']]}
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 201)
+                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
                 res1 = self._create_port(
                     self.fmt, n['network']['id'],
@@ -316,17 +323,19 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 ports_rpc = self.rpc.security_group_rules_for_devices(
                     ctx, devices=devices)
                 port_rpc = ports_rpc[port_id1]
-                expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+                expected = [{'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg1_id},
                             {'direction': 'ingress',
-                             'protocol': 'tcp', 'ethertype': 'IPv6',
+                             'protocol': const.PROTO_NAME_TCP,
+                             'ethertype': const.IPv6,
                              'port_range_max': 22,
                              'security_group_id': sg1_id,
                              'port_range_min': 22},
-                            {'direction': 'ingress', 'protocol': 'udp',
-                             'ethertype': 'IPv6',
+                            {'direction': 'ingress',
+                             'protocol': const.PROTO_NAME_UDP,
+                             'ethertype': const.IPv6,
                              'port_range_max': 23, 'security_group_id': sg1_id,
                              'port_range_min': 23,
                              'source_ip_prefix': fake_prefix},
@@ -336,7 +345,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 self._delete('ports', port_id1)
 
     def test_security_group_rules_for_devices_ipv6_egress(self):
-        fake_prefix = test_fw.FAKE_PREFIX['IPv6']
+        fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
         with self.network() as n:
             with nested(self.subnet(n,
                                     cidr=fake_prefix,
@@ -346,20 +355,20 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 sg1_id = sg1['security_group']['id']
                 rule1 = self._build_security_group_rule(
                     sg1_id,
-                    'egress', 'tcp', '22',
+                    'egress', const.PROTO_NAME_TCP, '22',
                     '22',
-                    ethertype='IPv6')
+                    ethertype=const.IPv6)
                 rule2 = self._build_security_group_rule(
                     sg1_id,
-                    'egress', 'udp', '23',
+                    'egress', const.PROTO_NAME_UDP, '23',
                     '23', fake_prefix,
-                    ethertype='IPv6')
+                    ethertype=const.IPv6)
                 rules = {
                     'security_group_rules': [rule1['security_group_rule'],
                                              rule2['security_group_rule']]}
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 201)
+                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
                 res1 = self._create_port(
                     self.fmt, n['network']['id'],
@@ -374,18 +383,21 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 ports_rpc = self.rpc.security_group_rules_for_devices(
                     ctx, devices=devices)
                 port_rpc = ports_rpc[port_id1]
-                expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+                expected = [{'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg1_id},
                             {'direction': 'egress',
-                             'protocol': 'tcp', 'ethertype': 'IPv6',
+                             'protocol': const.PROTO_NAME_TCP,
+                             'ethertype': const.IPv6,
                              'port_range_max': 22,
                              'security_group_id': sg1_id,
                              'port_range_min': 22},
-                            {'direction': 'egress', 'protocol': 'udp',
-                             'ethertype': 'IPv6',
-                             'port_range_max': 23, 'security_group_id': sg1_id,
+                            {'direction': 'egress',
+                             'protocol': const.PROTO_NAME_UDP,
+                             'ethertype': const.IPv6,
+                             'port_range_max': 23,
+                             'security_group_id': sg1_id,
                              'port_range_min': 23,
                              'dest_ip_prefix': fake_prefix},
                             ]
@@ -394,7 +406,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 self._delete('ports', port_id1)
 
     def test_security_group_rules_for_devices_ipv6_source_group(self):
-        fake_prefix = test_fw.FAKE_PREFIX['IPv6']
+        fake_prefix = test_fw.FAKE_PREFIX[const.IPv6]
         with self.network() as n:
             with nested(self.subnet(n,
                                     cidr=fake_prefix,
@@ -407,15 +419,15 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 sg2_id = sg2['security_group']['id']
                 rule1 = self._build_security_group_rule(
                     sg1_id,
-                    'ingress', 'tcp', '24',
+                    'ingress', const.PROTO_NAME_TCP, '24',
                     '25',
-                    ethertype='IPv6',
+                    ethertype=const.IPv6,
                     remote_group_id=sg2['security_group']['id'])
                 rules = {
                     'security_group_rules': [rule1['security_group_rule']]}
                 res = self._create_security_group_rule(self.fmt, rules)
                 self.deserialize(self.fmt, res)
-                self.assertEqual(res.status_int, 201)
+                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
                 res1 = self._create_port(
                     self.fmt, n['network']['id'],
@@ -438,17 +450,18 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
                 ports_rpc = self.rpc.security_group_rules_for_devices(
                     ctx, devices=devices)
                 port_rpc = ports_rpc[port_id1]
-                expected = [{'direction': 'egress', 'ethertype': 'IPv4',
+                expected = [{'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg1_id},
-                            {'direction': 'egress', 'ethertype': 'IPv4',
+                            {'direction': 'egress', 'ethertype': const.IPv4,
                              'security_group_id': sg2_id},
-                            {'direction': 'egress', 'ethertype': 'IPv6',
+                            {'direction': 'egress', 'ethertype': const.IPv6,
                              'security_group_id': sg2_id},
                             {'direction': 'ingress',
                              'source_ip_prefix': 'fe80::3/128',
-                             'protocol': 'tcp', 'ethertype': 'IPv6',
+                             'protocol': const.PROTO_NAME_TCP,
+                             'ethertype': const.IPv6,
                              'port_range_max': 25, 'port_range_min': 24,
                              'remote_group_id': sg2_id,
                              'security_group_id': sg1_id},
@@ -1220,36 +1233,36 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
         self.rpc = mock.Mock()
         self.agent.plugin_rpc = self.rpc
         rule1 = [{'direction': 'ingress',
-                  'protocol': 'udp',
-                  'ethertype': 'IPv4',
+                  'protocol': const.PROTO_NAME_UDP,
+                  'ethertype': const.IPv4,
                   'source_ip_prefix': '10.0.0.2',
                   'source_port_range_min': 67,
                   'source_port_range_max': 67,
                   'port_range_min': 68,
                   'port_range_max': 68},
                  {'direction': 'ingress',
-                  'protocol': 'tcp',
-                  'ethertype': 'IPv4',
+                  'protocol': const.PROTO_NAME_TCP,
+                  'ethertype': const.IPv4,
                   'port_range_min': 22,
                   'port_range_max': 22},
                  {'direction': 'egress',
-                  'ethertype': 'IPv4'}]
+                  'ethertype': const.IPv4}]
         rule2 = rule1[:]
         rule2 += [{'direction': 'ingress',
                   'source_ip_prefix': '10.0.0.4',
-                  'ethertype': 'IPv4'}]
+                  'ethertype': const.IPv4}]
         rule3 = rule2[:]
         rule3 += [{'direction': 'ingress',
-                  'protocol': 'icmp',
-                  'ethertype': 'IPv4'}]
+                  'protocol': const.PROTO_NAME_ICMP,
+                  'ethertype': const.IPv4}]
         rule4 = rule1[:]
         rule4 += [{'direction': 'ingress',
                   'source_ip_prefix': '10.0.0.3',
-                  'ethertype': 'IPv4'}]
+                  'ethertype': const.IPv4}]
         rule5 = rule4[:]
         rule5 += [{'direction': 'ingress',
-                  'protocol': 'icmp',
-                  'ethertype': 'IPv4'}]
+                  'protocol': const.PROTO_NAME_ICMP,
+                  'ethertype': const.IPv4}]
         self.devices1 = {'tap_port1': self._device('tap_port1',
                                                    '10.0.0.3',
                                                    '12:34:56:78:9a:bc',
@@ -1360,7 +1373,7 @@ class SGNotificationTestMixin():
                 security_group_id = sg['security_group']['id']
                 direction = "ingress"
                 remote_group_id = sg2['security_group']['id']
-                protocol = 'tcp'
+                protocol = const.PROTO_NAME_TCP
                 port_range_min = 88
                 port_range_max = 88
                 with self.security_group_rule(security_group_id, direction,