# under the License.
from oslo_config import cfg
-from oslo_db import exception as db_exc
from oslo_log import log
-from six import moves
import sqlalchemy as sa
from sqlalchemy import sql
from neutron.common import exceptions as n_exc
-from neutron.db import api as db_api
from neutron.db import model_base
-from neutron.i18n import _LE, _LW
+from neutron.i18n import _LE
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_tunnel
"Service terminated!"))
raise SystemExit()
- def sync_allocations(self):
-
- # determine current configured allocatable gres
- gre_ids = set()
- for gre_id_range in self.tunnel_ranges:
- tun_min, tun_max = gre_id_range
- gre_ids |= set(moves.range(tun_min, tun_max + 1))
-
- session = db_api.get_session()
- try:
- self._add_allocation(session, gre_ids)
- except db_exc.DBDuplicateEntry:
- # in case multiple neutron-servers start allocations could be
- # already added by different neutron-server. because this function
- # is called only when initializing this type driver, it's safe to
- # assume allocations were added.
- LOG.warning(_LW("Gre allocations were already created."))
-
- def _add_allocation(self, session, gre_ids):
- with session.begin(subtransactions=True):
- # remove from table unallocated tunnels not currently allocatable
- allocs = (session.query(GreAllocation).all())
- for alloc in allocs:
- try:
- # see if tunnel is allocatable
- gre_ids.remove(alloc.gre_id)
- except KeyError:
- # it's not allocatable, so check if its allocated
- if not alloc.allocated:
- # it's not, so remove it from table
- LOG.debug("Removing tunnel %s from pool", alloc.gre_id)
- session.delete(alloc)
-
- # add missing allocatable tunnels to table
- for gre_id in sorted(gre_ids):
- alloc = GreAllocation(gre_id=gre_id)
- session.add(alloc)
-
def get_endpoints(self):
"""Get every gre endpoints from database."""
gre_endpoints = self._get_endpoints()
# License for the specific language governing permissions and limitations
# under the License.
import abc
+import itertools
+import operator
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
+from six import moves
from neutron.common import exceptions as exc
from neutron.common import topics
TUNNEL = 'tunnel'
+def chunks(iterable, chunk_size):
+ """Chunks data into chunk with size<=chunk_size."""
+ iterator = iter(iterable)
+ chunk = list(itertools.islice(iterator, 0, chunk_size))
+ while chunk:
+ yield chunk
+ chunk = list(itertools.islice(iterator, 0, chunk_size))
+
+
class TunnelTypeDriver(helpers.SegmentTypeDriver):
"""Define stable abstract interface for ML2 type drivers.
tunnel type networks rely on tunnel endpoints. This class defines abstract
methods to manage these endpoints.
"""
+ BULK_SIZE = 100
def __init__(self, model):
super(TunnelTypeDriver, self).__init__(model)
self.segmentation_key = next(iter(self.primary_keys))
- @abc.abstractmethod
- def sync_allocations(self):
- """Synchronize type_driver allocation table with configured ranges."""
-
@abc.abstractmethod
def add_endpoint(self, ip, host):
"""Register the endpoint in the type_driver database.
LOG.info(_LI("%(type)s ID ranges: %(range)s"),
{'type': self.get_type(), 'range': current_range})
+ def sync_allocations(self):
+ # determine current configured allocatable tunnel ids
+ tunnel_ids = set()
+ for tun_min, tun_max in self.tunnel_ranges:
+ tunnel_ids |= set(moves.range(tun_min, tun_max + 1))
+
+ tunnel_id_getter = operator.attrgetter(self.segmentation_key)
+ tunnel_col = getattr(self.model, self.segmentation_key)
+ session = db_api.get_session()
+ with session.begin(subtransactions=True):
+ # remove from table unallocated tunnels not currently allocatable
+ # fetch results as list via all() because we'll be iterating
+ # through them twice
+ allocs = (session.query(self.model).
+ with_lockmode("update").all())
+
+ # collect those vnis that needs to be deleted from db
+ unallocateds = (
+ tunnel_id_getter(a) for a in allocs if not a.allocated)
+ to_remove = (x for x in unallocateds if x not in tunnel_ids)
+ # Immediately delete tunnels in chunks. This leaves no work for
+ # flush at the end of transaction
+ for chunk in chunks(to_remove, self.BULK_SIZE):
+ session.query(self.model).filter(
+ tunnel_col.in_(chunk)).delete(synchronize_session=False)
+
+ # collect vnis that need to be added
+ existings = {tunnel_id_getter(a) for a in allocs}
+ missings = list(tunnel_ids - existings)
+ for chunk in chunks(missings, self.BULK_SIZE):
+ bulk = [{self.segmentation_key: x, 'allocated': False}
+ for x in chunk]
+ session.execute(self.model.__table__.insert(), bulk)
+
def is_partial_segment(self, segment):
return segment.get(api.SEGMENTATION_ID) is None
from oslo_config import cfg
from oslo_log import log
-from six import moves
import sqlalchemy as sa
from sqlalchemy import sql
from neutron.common import exceptions as n_exc
-from neutron.db import api as db_api
from neutron.db import model_base
from neutron.i18n import _LE
from neutron.plugins.common import constants as p_const
"Service terminated!"))
raise SystemExit()
- def sync_allocations(self):
-
- # determine current configured allocatable vnis
- vxlan_vnis = set()
- for tun_min, tun_max in self.tunnel_ranges:
- vxlan_vnis |= set(moves.range(tun_min, tun_max + 1))
-
- session = db_api.get_session()
- with session.begin(subtransactions=True):
- # remove from table unallocated tunnels not currently allocatable
- # fetch results as list via all() because we'll be iterating
- # through them twice
- allocs = (session.query(VxlanAllocation).
- with_lockmode("update").all())
- # collect all vnis present in db
- existing_vnis = set(alloc.vxlan_vni for alloc in allocs)
- # collect those vnis that needs to be deleted from db
- vnis_to_remove = [alloc.vxlan_vni for alloc in allocs
- if (alloc.vxlan_vni not in vxlan_vnis and
- not alloc.allocated)]
- # Immediately delete vnis in chunks. This leaves no work for
- # flush at the end of transaction
- bulk_size = 100
- chunked_vnis = (vnis_to_remove[i:i + bulk_size] for i in
- range(0, len(vnis_to_remove), bulk_size))
- for vni_list in chunked_vnis:
- if vni_list:
- session.query(VxlanAllocation).filter(
- VxlanAllocation.vxlan_vni.in_(vni_list)).delete(
- synchronize_session=False)
- # collect vnis that need to be added
- vnis = list(vxlan_vnis - existing_vnis)
- chunked_vnis = (vnis[i:i + bulk_size] for i in
- range(0, len(vnis), bulk_size))
- for vni_list in chunked_vnis:
- bulk = [{'vxlan_vni': vni, 'allocated': False}
- for vni in vni_list]
- session.execute(VxlanAllocation.__table__.insert(), bulk)
-
def get_endpoints(self):
"""Get every vxlan endpoints from database."""
vxlan_endpoints = self._get_endpoints()
self.assertIsNone(
self.driver.get_allocation(self.session, (TUN_MAX + 5 + 1)))
+ def _test_sync_allocations_and_allocated(self, tunnel_id):
+ segment = {api.NETWORK_TYPE: self.TYPE,
+ api.PHYSICAL_NETWORK: None,
+ api.SEGMENTATION_ID: tunnel_id}
+ self.driver.reserve_provider_segment(self.session, segment)
+
+ self.driver.tunnel_ranges = UPDATED_TUNNEL_RANGES
+ self.driver.sync_allocations()
+
+ self.assertTrue(
+ self.driver.get_allocation(self.session, tunnel_id).allocated)
+
+ def test_sync_allocations_and_allocated_in_initial_range(self):
+ self._test_sync_allocations_and_allocated(TUN_MIN + 2)
+
+ def test_sync_allocations_and_allocated_in_final_range(self):
+ self._test_sync_allocations_and_allocated(TUN_MAX + 2)
+
+ def test_sync_allocations_no_op(self):
+
+ def verify_no_chunk(iterable, chunk_size):
+ # no segment removed/added
+ self.assertEqual(0, len(list(iterable)))
+ return []
+ with mock.patch.object(
+ type_tunnel, 'chunks', side_effect=verify_no_chunk) as chunks:
+ self.driver.sync_allocations()
+ self.assertEqual(2, len(chunks.mock_calls))
+
def test_partial_segment_is_partial_segment(self):
segment = {api.NETWORK_TYPE: self.TYPE,
api.PHYSICAL_NETWORK: None,
# License for the specific language governing permissions and limitations
# under the License.
-import mock
-
-from oslo_db import exception as db_exc
-from sqlalchemy.orm import exc as sa_exc
-import testtools
-
-from neutron.db import api as db_api
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import config
from neutron.plugins.ml2.drivers import type_gre
elif endpoint['ip_address'] == base_type_tunnel.TUNNEL_IP_TWO:
self.assertEqual(base_type_tunnel.HOST_TWO, endpoint['host'])
- def test_sync_allocations_entry_added_during_session(self):
- with mock.patch.object(self.driver, '_add_allocation',
- side_effect=db_exc.DBDuplicateEntry) as (
- mock_add_allocation):
- self.driver.sync_allocations()
- self.assertTrue(mock_add_allocation.called)
-
- def test__add_allocation_not_existing(self):
- session = db_api.get_session()
- _add_allocation(session, gre_id=1)
- self.driver._add_allocation(session, {1, 2})
- _get_allocation(session, 2)
-
- def test__add_allocation_existing_allocated_is_kept(self):
- session = db_api.get_session()
- _add_allocation(session, gre_id=1, allocated=True)
- self.driver._add_allocation(session, {2})
- _get_allocation(session, 1)
-
- def test__add_allocation_existing_not_allocated_is_removed(self):
- session = db_api.get_session()
- _add_allocation(session, gre_id=1)
- self.driver._add_allocation(session, {2})
- with testtools.ExpectedException(sa_exc.NoResultFound):
- _get_allocation(session, 1)
-
def test_get_mtu(self):
config.cfg.CONF.set_override('segment_mtu', 1500, group='ml2')
config.cfg.CONF.set_override('path_mtu', 1475, group='ml2')