]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Abstract sync_allocations
authorCedric Brandily <zzelle@gmail.com>
Mon, 15 Jun 2015 20:20:16 +0000 (22:20 +0200)
committerCedric Brandily <zzelle@gmail.com>
Thu, 25 Jun 2015 09:12:54 +0000 (09:12 +0000)
Currently gre/vxlan type drivers have specific sync_allocations
implementations to perform the same feature. This change abstracts
(vxlan) implementation and moves it to TunnelTypeDriver[1] in order to
share implementation between gre/vxlan; this change also adds some
unittests.

Current gre/vxlan and new implementations of sync_allocations are not
Galera-compliant with multi-writers, a follow-up will update it to
fully support Galera.

[1] neutron.plugins.ml2.drivers.type_tunnel

Change-Id: I188a7cf718d811084475f6783d844588de5d60ea

neutron/plugins/ml2/drivers/type_gre.py
neutron/plugins/ml2/drivers/type_tunnel.py
neutron/plugins/ml2/drivers/type_vxlan.py
neutron/tests/unit/plugins/ml2/drivers/base_type_tunnel.py
neutron/tests/unit/plugins/ml2/drivers/test_type_gre.py

index 5db7074c73c48819729139a40ff8e749bcfed26b..53b907c884c54986f16cf3db0287a50f3028cfee 100644 (file)
 #    under the License.
 
 from oslo_config import cfg
-from oslo_db import exception as db_exc
 from oslo_log import log
-from six import moves
 import sqlalchemy as sa
 from sqlalchemy import sql
 
 from neutron.common import exceptions as n_exc
-from neutron.db import api as db_api
 from neutron.db import model_base
-from neutron.i18n import _LE, _LW
+from neutron.i18n import _LE
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ml2.drivers import type_tunnel
 
@@ -83,44 +80,6 @@ class GreTypeDriver(type_tunnel.EndpointTunnelTypeDriver):
                               "Service terminated!"))
             raise SystemExit()
 
-    def sync_allocations(self):
-
-        # determine current configured allocatable gres
-        gre_ids = set()
-        for gre_id_range in self.tunnel_ranges:
-            tun_min, tun_max = gre_id_range
-            gre_ids |= set(moves.range(tun_min, tun_max + 1))
-
-        session = db_api.get_session()
-        try:
-            self._add_allocation(session, gre_ids)
-        except db_exc.DBDuplicateEntry:
-            # in case multiple neutron-servers start allocations could be
-            # already added by different neutron-server. because this function
-            # is called only when initializing this type driver, it's safe to
-            # assume allocations were added.
-            LOG.warning(_LW("Gre allocations were already created."))
-
-    def _add_allocation(self, session, gre_ids):
-        with session.begin(subtransactions=True):
-            # remove from table unallocated tunnels not currently allocatable
-            allocs = (session.query(GreAllocation).all())
-            for alloc in allocs:
-                try:
-                    # see if tunnel is allocatable
-                    gre_ids.remove(alloc.gre_id)
-                except KeyError:
-                    # it's not allocatable, so check if its allocated
-                    if not alloc.allocated:
-                        # it's not, so remove it from table
-                        LOG.debug("Removing tunnel %s from pool", alloc.gre_id)
-                        session.delete(alloc)
-
-            # add missing allocatable tunnels to table
-            for gre_id in sorted(gre_ids):
-                alloc = GreAllocation(gre_id=gre_id)
-                session.add(alloc)
-
     def get_endpoints(self):
         """Get every gre endpoints from database."""
         gre_endpoints = self._get_endpoints()
index 258e78c26445c4774b6d259c608a9a6758da6502..054d546801d8934603570645a3006866e6506bec 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 import abc
+import itertools
+import operator
 
 from oslo_config import cfg
 from oslo_db import exception as db_exc
 from oslo_log import log
+from six import moves
 
 from neutron.common import exceptions as exc
 from neutron.common import topics
@@ -31,21 +34,27 @@ LOG = log.getLogger(__name__)
 TUNNEL = 'tunnel'
 
 
+def chunks(iterable, chunk_size):
+    """Chunks data into chunk with size<=chunk_size."""
+    iterator = iter(iterable)
+    chunk = list(itertools.islice(iterator, 0, chunk_size))
+    while chunk:
+        yield chunk
+        chunk = list(itertools.islice(iterator, 0, chunk_size))
+
+
 class TunnelTypeDriver(helpers.SegmentTypeDriver):
     """Define stable abstract interface for ML2 type drivers.
 
     tunnel type networks rely on tunnel endpoints. This class defines abstract
     methods to manage these endpoints.
     """
+    BULK_SIZE = 100
 
     def __init__(self, model):
         super(TunnelTypeDriver, self).__init__(model)
         self.segmentation_key = next(iter(self.primary_keys))
 
-    @abc.abstractmethod
-    def sync_allocations(self):
-        """Synchronize type_driver allocation table with configured ranges."""
-
     @abc.abstractmethod
     def add_endpoint(self, ip, host):
         """Register the endpoint in the type_driver database.
@@ -113,6 +122,40 @@ class TunnelTypeDriver(helpers.SegmentTypeDriver):
         LOG.info(_LI("%(type)s ID ranges: %(range)s"),
                  {'type': self.get_type(), 'range': current_range})
 
+    def sync_allocations(self):
+        # determine current configured allocatable tunnel ids
+        tunnel_ids = set()
+        for tun_min, tun_max in self.tunnel_ranges:
+            tunnel_ids |= set(moves.range(tun_min, tun_max + 1))
+
+        tunnel_id_getter = operator.attrgetter(self.segmentation_key)
+        tunnel_col = getattr(self.model, self.segmentation_key)
+        session = db_api.get_session()
+        with session.begin(subtransactions=True):
+            # remove from table unallocated tunnels not currently allocatable
+            # fetch results as list via all() because we'll be iterating
+            # through them twice
+            allocs = (session.query(self.model).
+                      with_lockmode("update").all())
+
+            # collect those vnis that needs to be deleted from db
+            unallocateds = (
+                tunnel_id_getter(a) for a in allocs if not a.allocated)
+            to_remove = (x for x in unallocateds if x not in tunnel_ids)
+            # Immediately delete tunnels in chunks. This leaves no work for
+            # flush at the end of transaction
+            for chunk in chunks(to_remove, self.BULK_SIZE):
+                session.query(self.model).filter(
+                    tunnel_col.in_(chunk)).delete(synchronize_session=False)
+
+            # collect vnis that need to be added
+            existings = {tunnel_id_getter(a) for a in allocs}
+            missings = list(tunnel_ids - existings)
+            for chunk in chunks(missings, self.BULK_SIZE):
+                bulk = [{self.segmentation_key: x, 'allocated': False}
+                        for x in chunk]
+                session.execute(self.model.__table__.insert(), bulk)
+
     def is_partial_segment(self, segment):
         return segment.get(api.SEGMENTATION_ID) is None
 
index 52e5f7eaee79191a4f4114c3206e89cc12e70b03..c6f9dbf107308fa8a757b7a6d81567e2e32feb54 100644 (file)
 
 from oslo_config import cfg
 from oslo_log import log
-from six import moves
 import sqlalchemy as sa
 from sqlalchemy import sql
 
 from neutron.common import exceptions as n_exc
-from neutron.db import api as db_api
 from neutron.db import model_base
 from neutron.i18n import _LE
 from neutron.plugins.common import constants as p_const
@@ -86,45 +84,6 @@ class VxlanTypeDriver(type_tunnel.EndpointTunnelTypeDriver):
                               "Service terminated!"))
             raise SystemExit()
 
-    def sync_allocations(self):
-
-        # determine current configured allocatable vnis
-        vxlan_vnis = set()
-        for tun_min, tun_max in self.tunnel_ranges:
-            vxlan_vnis |= set(moves.range(tun_min, tun_max + 1))
-
-        session = db_api.get_session()
-        with session.begin(subtransactions=True):
-            # remove from table unallocated tunnels not currently allocatable
-            # fetch results as list via all() because we'll be iterating
-            # through them twice
-            allocs = (session.query(VxlanAllocation).
-                      with_lockmode("update").all())
-            # collect all vnis present in db
-            existing_vnis = set(alloc.vxlan_vni for alloc in allocs)
-            # collect those vnis that needs to be deleted from db
-            vnis_to_remove = [alloc.vxlan_vni for alloc in allocs
-                              if (alloc.vxlan_vni not in vxlan_vnis and
-                                  not alloc.allocated)]
-            # Immediately delete vnis in chunks. This leaves no work for
-            # flush at the end of transaction
-            bulk_size = 100
-            chunked_vnis = (vnis_to_remove[i:i + bulk_size] for i in
-                            range(0, len(vnis_to_remove), bulk_size))
-            for vni_list in chunked_vnis:
-                if vni_list:
-                    session.query(VxlanAllocation).filter(
-                        VxlanAllocation.vxlan_vni.in_(vni_list)).delete(
-                            synchronize_session=False)
-            # collect vnis that need to be added
-            vnis = list(vxlan_vnis - existing_vnis)
-            chunked_vnis = (vnis[i:i + bulk_size] for i in
-                            range(0, len(vnis), bulk_size))
-            for vni_list in chunked_vnis:
-                bulk = [{'vxlan_vni': vni, 'allocated': False}
-                        for vni in vni_list]
-                session.execute(VxlanAllocation.__table__.insert(), bulk)
-
     def get_endpoints(self):
         """Get every vxlan endpoints from database."""
         vxlan_endpoints = self._get_endpoints()
index 725fdaab18e51c7d8e387d4d84b2e55954a26062..5bbb3ec38dc93831d6b28a8f8ba8300b648194c3 100644 (file)
@@ -93,6 +93,35 @@ class TunnelTypeTestMixin(object):
         self.assertIsNone(
             self.driver.get_allocation(self.session, (TUN_MAX + 5 + 1)))
 
+    def _test_sync_allocations_and_allocated(self, tunnel_id):
+        segment = {api.NETWORK_TYPE: self.TYPE,
+                   api.PHYSICAL_NETWORK: None,
+                   api.SEGMENTATION_ID: tunnel_id}
+        self.driver.reserve_provider_segment(self.session, segment)
+
+        self.driver.tunnel_ranges = UPDATED_TUNNEL_RANGES
+        self.driver.sync_allocations()
+
+        self.assertTrue(
+            self.driver.get_allocation(self.session, tunnel_id).allocated)
+
+    def test_sync_allocations_and_allocated_in_initial_range(self):
+        self._test_sync_allocations_and_allocated(TUN_MIN + 2)
+
+    def test_sync_allocations_and_allocated_in_final_range(self):
+        self._test_sync_allocations_and_allocated(TUN_MAX + 2)
+
+    def test_sync_allocations_no_op(self):
+
+        def verify_no_chunk(iterable, chunk_size):
+            # no segment removed/added
+            self.assertEqual(0, len(list(iterable)))
+            return []
+        with mock.patch.object(
+                type_tunnel, 'chunks', side_effect=verify_no_chunk) as chunks:
+            self.driver.sync_allocations()
+            self.assertEqual(2, len(chunks.mock_calls))
+
     def test_partial_segment_is_partial_segment(self):
         segment = {api.NETWORK_TYPE: self.TYPE,
                    api.PHYSICAL_NETWORK: None,
index ec4d342012b68ed72cd05aa0e721e8b6f3400326..0471c68ec41fb5b14e5e1fe122e115ba0263c8ac 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import mock
-
-from oslo_db import exception as db_exc
-from sqlalchemy.orm import exc as sa_exc
-import testtools
-
-from neutron.db import api as db_api
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ml2 import config
 from neutron.plugins.ml2.drivers import type_gre
@@ -62,32 +55,6 @@ class GreTypeTest(base_type_tunnel.TunnelTypeTestMixin,
             elif endpoint['ip_address'] == base_type_tunnel.TUNNEL_IP_TWO:
                 self.assertEqual(base_type_tunnel.HOST_TWO, endpoint['host'])
 
-    def test_sync_allocations_entry_added_during_session(self):
-        with mock.patch.object(self.driver, '_add_allocation',
-                               side_effect=db_exc.DBDuplicateEntry) as (
-                mock_add_allocation):
-            self.driver.sync_allocations()
-            self.assertTrue(mock_add_allocation.called)
-
-    def test__add_allocation_not_existing(self):
-        session = db_api.get_session()
-        _add_allocation(session, gre_id=1)
-        self.driver._add_allocation(session, {1, 2})
-        _get_allocation(session, 2)
-
-    def test__add_allocation_existing_allocated_is_kept(self):
-        session = db_api.get_session()
-        _add_allocation(session, gre_id=1, allocated=True)
-        self.driver._add_allocation(session, {2})
-        _get_allocation(session, 1)
-
-    def test__add_allocation_existing_not_allocated_is_removed(self):
-        session = db_api.get_session()
-        _add_allocation(session, gre_id=1)
-        self.driver._add_allocation(session, {2})
-        with testtools.ExpectedException(sa_exc.NoResultFound):
-            _get_allocation(session, 1)
-
     def test_get_mtu(self):
         config.cfg.CONF.set_override('segment_mtu', 1500, group='ml2')
         config.cfg.CONF.set_override('path_mtu', 1475, group='ml2')