]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Make ipsec_site_connection dpd_timeout == dpd_interval return 400
authorNachi Ueno <nachi@ntti3.com>
Sun, 1 Sep 2013 17:24:16 +0000 (10:24 -0700)
committerPaul Michali <pcm@cisco.com>
Mon, 2 Sep 2013 11:20:22 +0000 (07:20 -0400)
dpd_timeout == dpd_interval is a invalid case,
so in this commit, we modify validation and added test.
Fixes bug 1219440

Change-Id: I14fb9aa7df890f9c5a27f18f20d7dc1a316b2d79

neutron/db/vpn/vpn_db.py
neutron/extensions/vpnaas.py
neutron/tests/unit/db/vpn/test_db_vpnaas.py

index 1a309a51d2ac17f98e6c30813c0f63ef4cabd740..727ee85ccabb25761a8fa97c9dd8eeabd8463183 100644 (file)
@@ -229,9 +229,7 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
         ipsec_sitecon['dpd_interval'] = dpd.get('interval', 30)
         ipsec_sitecon['dpd_timeout'] = dpd.get('timeout', 120)
         tenant_id = self._get_tenant_id_for_create(context, ipsec_sitecon)
-        if ipsec_sitecon['dpd_timeout'] < ipsec_sitecon['dpd_interval']:
-            raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
-                attribute_a='dpd_timeout')
+        self._check_dpd(ipsec_sitecon)
         with context.session.begin(subtransactions=True):
             #Check permissions
             self._get_resource(context,
@@ -273,31 +271,40 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
                 context.session.add(peer_cidr_db)
         return self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
 
+    def _check_dpd(self, ipsec_sitecon):
+        if ipsec_sitecon['dpd_timeout'] <= ipsec_sitecon['dpd_interval']:
+            raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
+                attr='dpd_timeout')
+
     def update_ipsec_site_connection(
             self, context,
             ipsec_site_conn_id, ipsec_site_connection):
-        ipsec_sitecon = ipsec_site_connection['ipsec_site_connection']
-        dpd = ipsec_sitecon.get('dpd', {})
-        if dpd.get('action'):
-            ipsec_sitecon['dpd_action'] = dpd.get('action')
-        if dpd.get('interval'):
-            ipsec_sitecon['dpd_interval'] = dpd.get('interval')
-        if dpd.get('timeout'):
-            ipsec_sitecon['dpd_timeout'] = dpd.get('timeout')
+        conn = ipsec_site_connection['ipsec_site_connection']
         changed_peer_cidrs = False
         with context.session.begin(subtransactions=True):
             ipsec_site_conn_db = self._get_resource(
                 context,
                 IPsecSiteConnection,
                 ipsec_site_conn_id)
+            dpd = conn.get('dpd', {})
+            if dpd.get('action'):
+                conn['dpd_action'] = dpd.get('action')
+            if dpd.get('interval') or dpd.get('timeout'):
+                conn['dpd_interval'] = dpd.get(
+                    'interval', ipsec_site_conn_db.dpd_interval)
+                conn['dpd_timeout'] = dpd.get(
+                    'timeout', ipsec_site_conn_db.dpd_timeout)
+                self._check_dpd(conn)
+
             self.assert_update_allowed(ipsec_site_conn_db)
-            if "peer_cidrs" in ipsec_sitecon:
+
+            if "peer_cidrs" in conn:
                 changed_peer_cidrs = True
                 old_peer_cidr_list = ipsec_site_conn_db['peer_cidrs']
                 old_peer_cidr_dict = dict(
                     (peer_cidr['cidr'], peer_cidr)
                     for peer_cidr in old_peer_cidr_list)
-                new_peer_cidr_set = set(ipsec_sitecon["peer_cidrs"])
+                new_peer_cidr_set = set(conn["peer_cidrs"])
                 old_peer_cidr_set = set(old_peer_cidr_dict)
 
                 new_peer_cidrs = list(new_peer_cidr_set)
@@ -308,9 +315,9 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
                         cidr=peer_cidr,
                         ipsec_site_connection_id=ipsec_site_conn_id)
                     context.session.add(pcidr)
-                del ipsec_sitecon["peer_cidrs"]
-            if ipsec_sitecon:
-                ipsec_site_conn_db.update(ipsec_sitecon)
+                del conn["peer_cidrs"]
+            if conn:
+                ipsec_site_conn_db.update(conn)
         result = self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
         if changed_peer_cidrs:
             result['peer_cidrs'] = new_peer_cidrs
index d607c8f4868f3a7e777722a6308f9ccb6bc2d65a..b02890d24e4b3cd41e5d1f4039312d826da80994 100644 (file)
@@ -40,7 +40,8 @@ class IPsecSiteConnectionNotFound(qexception.NotFound):
 
 
 class IPsecSiteConnectionDpdIntervalValueError(qexception.InvalidInput):
-    message = _("ipsec_site_connection %(attribute_a)s less than dpd_interval")
+    message = _("ipsec_site_connection %(attr)s is "
+                "equal to or less than dpd_interval")
 
 
 class IKEPolicyNotFound(qexception.NotFound):
index c17ea08be78aa2c9c248018ee4049767641555f2..17ba5d1a9be2c36e9bac8ed365171d7e4b1c1d15 100644 (file)
@@ -956,6 +956,11 @@ class TestVpnaas(VPNPluginDbTestCase):
             name=name,
             dpd_interval=30,
             dpd_timeout=20, expected_status_int=400)
+        self._create_ipsec_site_connection(
+            fmt=self.fmt,
+            name=name,
+            dpd_interval=100,
+            dpd_timeout=100, expected_status_int=400)
 
     def test_create_ipsec_site_connection(self, **extras):
         """Test case to create an ipsec_site_connection."""
@@ -1040,6 +1045,35 @@ class TestVpnaas(VPNPluginDbTestCase):
             self.assertEqual(res.status_int, 204)
 
     def test_update_ipsec_site_connection(self):
+        dpd = {'action': 'hold',
+               'interval': 40,
+               'timeout': 120}
+        self._test_update_ipsec_site_connection(
+            update={'dpd': dpd}
+        )
+
+    def test_update_ipsec_site_connection_with_invalid_dpd(self):
+        dpd1 = {'action': 'hold',
+                'interval': 100,
+                'timeout': 100}
+        self._test_update_ipsec_site_connection(
+            update={'dpd': dpd1},
+            expected_status_int=400)
+        dpd2 = {'action': 'hold',
+                'interval': 100,
+                'timeout': 60}
+        self._test_update_ipsec_site_connection(
+            update={'dpd': dpd2},
+            expected_status_int=400)
+        dpd3 = {'action': 'hold',
+                'interval': -50,
+                'timeout': -100}
+        self._test_update_ipsec_site_connection(
+            update={'dpd': dpd3},
+            expected_status_int=400)
+
+    def _test_update_ipsec_site_connection(
+        self, update=None, expected_status_int=200):
         """Test case to update a ipsec_site_connection."""
         name = 'new_ipsec_site_connection'
         ikename = 'ikepolicy1'
@@ -1095,7 +1129,9 @@ class TestVpnaas(VPNPluginDbTestCase):
                     keys['admin_state_up'],
                     description=description
                 ) as ipsec_site_connection:
-                    data = {'ipsec_site_connection': {'name': name}}
+                    if not update:
+                        update = {'name': name}
+                    data = {'ipsec_site_connection': update}
                     self._set_active(
                         vpn_db.IPsecSiteConnection,
                         ipsec_site_connection['ipsec_site_connection']['id'])
@@ -1104,12 +1140,16 @@ class TestVpnaas(VPNPluginDbTestCase):
                         data,
                         ipsec_site_connection['ipsec_site_connection']['id']
                     )
-                    res = self.deserialize(
-                        self.fmt,
-                        req.get_response(self.ext_api)
-                    )
-                    for k, v in keys.items():
-                        self.assertEqual(res['ipsec_site_connection'][k], v)
+                    res = req.get_response(self.ext_api)
+                    self.assertEqual(expected_status_int, res.status_int)
+                    if expected_status_int == 200:
+                        res_dict = self.deserialize(
+                            self.fmt,
+                            res
+                        )
+                        for k, v in update.items():
+                            self.assertEqual(
+                                res_dict['ipsec_site_connection'][k], v)
 
     def test_update_ipsec_site_connection_with_invalid_state(self):
         """Test case to update an ipsec_site_connection in invalid state."""