]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
plugin/ryu: add tunnel support
authorIsaku Yamahata <yamahata@valinux.co.jp>
Fri, 10 Aug 2012 05:53:08 +0000 (14:53 +0900)
committerIsaku Yamahata <yamahata@valinux.co.jp>
Mon, 12 Nov 2012 19:20:10 +0000 (04:20 +0900)
blueprint ryu-tunnel-support
This patch adds tunneling support to Ryu plugin.
Ryu supports gre tunneling which requires quantum ryu plugin to manage
key assignment.

Change-Id: I9f8db0913941c3da13045170e1557d333f0c68e2
Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
etc/quantum/plugins/ryu/ryu.ini
quantum/plugins/ryu/agent/ryu_quantum_agent.py
quantum/plugins/ryu/common/config.py
quantum/plugins/ryu/db/api_v2.py
quantum/plugins/ryu/db/models_v2.py
quantum/plugins/ryu/ryu_quantum_plugin.py
quantum/tests/unit/ryu/test_ryu_db.py
quantum/tests/unit/ryu/test_ryu_plugin.py
quantum/tests/unit/ryu/utils.py [deleted file]

index 42e5525de7eba068b208770dd513cbfd4ee40bec..cb376a12a5bf2d42c8951791c16bb77a6e28368b 100644 (file)
@@ -12,6 +12,11 @@ integration_bridge = br-int
 openflow_controller = 127.0.0.1:6633
 openflow_rest_api = 127.0.0.1:8080
 
+# tunnel key range: 0 < tunnel_key_min < tunnel_key_max
+# VLAN: 12bits, GRE, VXLAN: 24bits
+# tunnel_key_min = 1
+# tunnel_key_max = 0xffffff
+
 [AGENT]
 # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
 # root filter facility.
index 2a9d6e0945ec0b593e6353131da767332ec02e44..593c53910dab3fc9efe61a0bf235caa3cfad1270 100755 (executable)
@@ -20,7 +20,6 @@
 #    under the License.
 # @author: Isaku Yamahata
 
-import logging as LOG
 import sys
 import time
 
@@ -33,6 +32,7 @@ from quantum.agent.linux.ovs_lib import VifPort
 from quantum.common import config as logging_config
 from quantum.common import constants
 from quantum.openstack.common import cfg
+from quantum.openstack.common import log as LOG
 from quantum.plugins.ryu.common import config
 
 
index 280560d67bc5d201028e3a458e8fdfaecc9ef6be..8d771e6da95d72f36f702f9392c32035ae5e0219 100644 (file)
@@ -27,6 +27,8 @@ ovs_opts = [
     cfg.StrOpt('integration_bridge', default='br-int'),
     cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'),
     cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'),
+    cfg.IntOpt('tunnel_key_min', default=1),
+    cfg.IntOpt('tunnel_key_max', default=0xffffff)
 ]
 
 agent_opts = [
index 0bd65eb73c3213e87a4e2deb875fa8d9522c8c99..b61416c85595aa31dfccd5447b0bc740301135fa 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from sqlalchemy import exc as sa_exc
+from sqlalchemy import func
+from sqlalchemy.orm import exc as orm_exc
+
+from quantum.common import exceptions as q_exc
 import quantum.db.api as db
-from quantum.db.models_v2 import Network
-from quantum.plugins.ryu.db import models_v2
+from quantum.db import models_v2
+from quantum.openstack.common import log as logging
+from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
+
+
+LOG = logging.getLogger(__name__)
 
 
 def set_ofp_servers(hosts):
     session = db.get_session()
-    session.query(models_v2.OFPServer).delete()
+    session.query(ryu_models_v2.OFPServer).delete()
     for (host_address, host_type) in hosts:
-        host = models_v2.OFPServer(host_address, host_type)
+        host = ryu_models_v2.OFPServer(address=host_address,
+                                       host_type=host_type)
         session.add(host)
     session.flush()
 
 
 def network_all_tenant_list():
     session = db.get_session()
-    return session.query(Network).all()
+    return session.query(models_v2.Network).all()
+
+
+class TunnelKey(object):
+    # VLAN: 12 bits
+    # GRE, VXLAN: 24bits
+    # TODO(yamahata): STT: 64bits
+    _KEY_MIN_HARD = 1
+    _KEY_MAX_HARD = 0xffffffff
+
+    def __init__(self, key_min=_KEY_MIN_HARD, key_max=_KEY_MAX_HARD):
+        self.key_min = key_min
+        self.key_max = key_max
+
+        if (key_min < self._KEY_MIN_HARD or key_max > self._KEY_MAX_HARD or
+                key_min > key_max):
+            raise ValueError('Invalid tunnel key options '
+                             'tunnel_key_min: %d tunnel_key_max: %d. '
+                             'Using default value' % (key_min, key_min))
+
+    def _last_key(self, session):
+        try:
+            return session.query(ryu_models_v2.TunnelKeyLast).one()
+        except orm_exc.MultipleResultsFound:
+            max_key = session.query(
+                func.max(ryu_models_v2.TunnelKeyLast.last_key))
+            if max_key > self.key_max:
+                max_key = self.key_min
+
+            session.query(ryu_models_v2.TunnelKeyLast).delete()
+            last_key = ryu_models_v2.TunnelKeyLast(last_key=max_key)
+        except orm_exc.NoResultFound:
+            last_key = ryu_models_v2.TunnelKeyLast(last_key=self.key_min)
+
+        session.add(last_key)
+        session.flush()
+        return session.query(ryu_models_v2.TunnelKeyLast).one()
+
+    def _find_key(self, session, last_key):
+        """
+        Try to find unused tunnel key in TunnelKey table starting
+        from last_key + 1.
+        When all keys are used, raise sqlalchemy.orm.exc.NoResultFound
+        """
+        # key 0 is used for special meanings. So don't allocate 0.
+
+        # sqlite doesn't support
+        # '(select order by limit) union all (select order by limit) '
+        # 'order by limit'
+        # So do it manually
+        # new_key = session.query("new_key").from_statement(
+        #     # If last_key + 1 isn't used, it's the result
+        #     'SELECT new_key '
+        #     'FROM (SELECT :last_key + 1 AS new_key) q1 '
+        #     'WHERE NOT EXISTS '
+        #     '(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) '
+        #
+        #     'UNION ALL '
+        #
+        #     # if last_key + 1 used,
+        #     # find the least unused key from last_key + 1
+        #     '(SELECT t.tunnel_key + 1 AS new_key '
+        #     'FROM tunnelkeys t '
+        #     'WHERE NOT EXISTS '
+        #     '(SELECT 1 FROM tunnelkeys ti '
+        #     ' WHERE ti.tunnel_key = t.tunnel_key + 1) '
+        #     'AND t.tunnel_key >= :last_key '
+        #     'ORDER BY new_key LIMIT 1) '
+        #
+        #     'ORDER BY new_key LIMIT 1'
+        # ).params(last_key=last_key).one()
+        try:
+            new_key = session.query("new_key").from_statement(
+                # If last_key + 1 isn't used, it's the result
+                'SELECT new_key '
+                'FROM (SELECT :last_key + 1 AS new_key) q1 '
+                'WHERE NOT EXISTS '
+                '(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) '
+            ).params(last_key=last_key).one()
+        except orm_exc.NoResultFound:
+            new_key = session.query("new_key").from_statement(
+                # if last_key + 1 used,
+                # find the least unused key from last_key + 1
+                '(SELECT t.tunnel_key + 1 AS new_key '
+                'FROM tunnelkeys t '
+                'WHERE NOT EXISTS '
+                '(SELECT 1 FROM tunnelkeys ti '
+                ' WHERE ti.tunnel_key = t.tunnel_key + 1) '
+                'AND t.tunnel_key >= :last_key '
+                'ORDER BY new_key LIMIT 1) '
+            ).params(last_key=last_key).one()
+
+        new_key = new_key[0]  # the result is tuple.
+        LOG.debug("last_key %s new_key %s", last_key, new_key)
+        if new_key > self.key_max:
+            LOG.debug("no key found")
+            raise orm_exc.NoResultFound()
+        return new_key
+
+    def _allocate(self, session, network_id):
+        last_key = self._last_key(session)
+        try:
+            new_key = self._find_key(session, last_key.last_key)
+        except orm_exc.NoResultFound:
+            new_key = self._find_key(session, self.key_min)
+
+        tunnel_key = ryu_models_v2.TunnelKey(network_id=network_id,
+                                             tunnel_key=new_key)
+        last_key.last_key = new_key
+        session.add(tunnel_key)
+        return new_key
+
+    _TRANSACTION_RETRY_MAX = 16
+
+    def allocate(self, session, network_id):
+        count = 0
+        while True:
+            session.begin(subtransactions=True)
+            try:
+                new_key = self._allocate(session, network_id)
+                session.commit()
+                break
+            except sa_exc.SQLAlchemyError:
+                session.rollback()
+
+            count += 1
+            if count > self._TRANSACTION_RETRY_MAX:
+                # if this happens too often, increase _TRANSACTION_RETRY_MAX
+                LOG.warn("Transaction retry reaches to %d. "
+                         "abandan to allocate tunnel key." % count)
+                raise q_exc.ResourceExhausted()
+
+        return new_key
+
+    def delete(self, session, network_id):
+        session.query(ryu_models_v2.TunnelKey).filter_by(
+            network_id=network_id).delete()
+        session.flush()
+
+    def all_list(self):
+        session = db.get_session()
+        return session.query(ryu_models_v2.TunnelKey).all()
index ce0a8301ba06ff4c190116dad6d9c8ff76dbd6ac..8a34ee254f64efbe20b7d14c11a96d190cff3199 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from sqlalchemy import Column, Integer, String
+import sqlalchemy as sa
 
+from quantum.db import model_base
 from quantum.db import models_v2
 
 
-class OFPServer(models_v2.model_base.BASEV2):
-    """Openflow Server/API address"""
+class OFPServer(model_base.BASEV2):
+    """Openflow Server/API address."""
     __tablename__ = 'ofp_server'
 
-    id = Column(Integer, primary_key=True, autoincrement=True)
-    address = Column(String(255))       # netloc <host ip address>:<port>
-    host_type = Column(String(255))     # server type
-                                        # Controller, REST_API
-
-    def __init__(self, address, host_type):
-        self.address = address
-        self.host_type = host_type
+    id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
+    address = sa.Column(sa.String(64))        # netloc <host ip address>:<port>
+    host_type = sa.Column(sa.String(255))     # server type
+                                              # Controller, REST_API
 
     def __repr__(self):
         return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
                                           self.host_type)
+
+
+class TunnelKeyLast(model_base.BASEV2):
+    """Lastly allocated Tunnel key. The next key allocation will be started
+    from this value + 1
+    """
+    last_key = sa.Column(sa.Integer, primary_key=True)
+
+    def __repr__(self):
+        return "<TunnelKeyLast(%x)>" % self.last_key
+
+
+class TunnelKey(model_base.BASEV2):
+    """Netowrk ID <-> tunnel key mapping."""
+    network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"),
+                           nullable=False)
+    tunnel_key = sa.Column(sa.Integer, primary_key=True,
+                           nullable=False, autoincrement=False)
+
+    def __repr__(self):
+        return "<TunnelKey(%s,%x)>" % (self.network_id, self.tunnel_key)
index 9f95c3b78fd6d26f6acb7d671e7794f441c5b6e9..57303e459c3a0ef2bb1360c5a6c338ce4fcf7a25 100644 (file)
 #    under the License.
 # @author: Isaku Yamahata
 
-import logging
-
 from ryu.app import client
 from ryu.app import rest_nw_id
+from sqlalchemy.orm import exc as sql_exc
 
 from quantum.common import exceptions as q_exc
 from quantum.common import topics
@@ -29,12 +28,14 @@ from quantum.db.dhcp_rpc_base import DhcpRpcCallbackMixin
 from quantum.db import l3_db
 from quantum.db import models_v2
 from quantum.openstack.common import cfg
+from quantum.openstack.common import log as logging
 from quantum.openstack.common import rpc
 from quantum.openstack.common.rpc import dispatcher
 from quantum.plugins.ryu.common import config
 from quantum.plugins.ryu.db import api_v2 as db_api_v2
 from quantum.plugins.ryu import ofp_service_type
 
+
 LOG = logging.getLogger(__name__)
 
 
@@ -50,6 +51,8 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
         options.update({"reconnect_interval": reconnect_interval})
         db.configure_db(options)
 
+        self.tunnel_key = db_api_v2.TunnelKey(
+            cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max)
         ofp_con_host = cfg.CONF.OVS.openflow_controller
         ofp_api_host = cfg.CONF.OVS.openflow_rest_api
 
@@ -61,7 +64,10 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
         db_api_v2.set_ofp_servers(hosts)
 
         self.client = client.OFPClient(ofp_api_host)
-        self.client.update_network(rest_nw_id.NW_ID_EXTERNAL)
+        self.tun_client = client.TunnelClient(ofp_api_host)
+        for nw_id in rest_nw_id.RESERVED_NETWORK_IDS:
+            if nw_id != rest_nw_id.NW_ID_UNKNOWN:
+                self.client.update_network(nw_id)
         self._setup_rpc()
 
         # register known all network list on startup
@@ -75,18 +81,36 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
         self.conn.consume_in_thread()
 
     def _create_all_tenant_network(self):
-        networks = db_api_v2.network_all_tenant_list()
-        for net in networks:
+        for net in db_api_v2.network_all_tenant_list():
             self.client.update_network(net.id)
+        for tun in self.tunnel_key.all_list():
+            self.tun_client.update_tunnel_key(tun.network_id, tun.tunnel_key)
+
+    def _client_create_network(self, net_id, tunnel_key):
+        self.client.create_network(net_id)
+        self.tun_client.create_tunnel_key(net_id, tunnel_key)
+
+    def _client_delete_network(self, net_id):
+        client.ignore_http_not_found(
+            lambda: self.client.delete_network(net_id))
+        client.ignore_http_not_found(
+            lambda: self.tun_client.delete_tunnel_key(net_id))
 
     def create_network(self, context, network):
         session = context.session
         with session.begin(subtransactions=True):
             net = super(RyuQuantumPluginV2, self).create_network(context,
                                                                  network)
-            self.client.create_network(net['id'])
             self._process_l3_create(context, network['network'], net['id'])
             self._extend_network_dict_l3(context, net)
+
+            tunnel_key = self.tunnel_key.allocate(session, net['id'])
+            try:
+                self._client_create_network(net['id'], tunnel_key)
+            except:
+                self._client_delete_network(net['id'])
+                raise
+
         return net
 
     def update_network(self, context, id, network):
@@ -99,10 +123,11 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
         return net
 
     def delete_network(self, context, id):
+        self._client_delete_network(id)
         session = context.session
         with session.begin(subtransactions=True):
+            self.tunnel_key.delete(session, id)
             super(RyuQuantumPluginV2, self).delete_network(context, id)
-            self.client.delete_network(id)
 
     def get_network(self, context, id, fields=None):
         net = super(RyuQuantumPluginV2, self).get_network(context, id, None)
index aaf9d7b4c4eb653730b4401b653bbb6d9815664e..f09f95830fc45806cf11ebf10e7c86306bb8a61e 100644 (file)
@@ -15,7 +15,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import os
+import operator
 import unittest2
 
 from quantum.db import api as db
@@ -52,3 +52,26 @@ class RyuDBTest(unittest2.TestCase):
         self.assertEqual(len(servers), 2)
         for s in servers:
             self.assertTrue((s.address, s.host_type) in self.hosts)
+
+    @staticmethod
+    def _tunnel_key_sort(key_list):
+        key_list.sort(key=operator.attrgetter('tunnel_key'))
+        return [(key.network_id, key.tunnel_key) for key in key_list]
+
+    def test_key_allocation(self):
+        tunnel_key = db_api_v2.TunnelKey()
+        session = db.get_session()
+        network_id0 = u'network-id-0'
+        key0 = tunnel_key.allocate(session, network_id0)
+        network_id1 = u'network-id-1'
+        key1 = tunnel_key.allocate(session, network_id1)
+        key_list = tunnel_key.all_list()
+        self.assertEqual(len(key_list), 2)
+
+        expected_list = [(network_id0, key0), (network_id1, key1)]
+        self.assertEqual(self._tunnel_key_sort(key_list), expected_list)
+
+        tunnel_key.delete(session, network_id0)
+        key_list = tunnel_key.all_list()
+        self.assertEqual(self._tunnel_key_sort(key_list),
+                         [(network_id1, key1)])
index fb508aa34a6162378609a5c7abb2f16ebf1d4caa..366b2fe1fe8f45212cf7d4a8e97af631f130a47d 100644 (file)
@@ -28,7 +28,15 @@ class RyuPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
         ryu_app_client = ryu_app_mod.client
         rest_nw_id = ryu_app_mod.rest_nw_id
         rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__'
+        rest_nw_id.NW_ID_RESERVED = '__NW_ID_RESERVED__'
+        rest_nw_id.NW_ID_VPORT_GRE = '__NW_ID_VPORT_GRE__'
         rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__'
+        rest_nw_id.RESERVED_NETWORK_IDS = [
+            rest_nw_id.NW_ID_EXTERNAL,
+            rest_nw_id.NW_ID_RESERVED,
+            rest_nw_id.NW_ID_VPORT_GRE,
+            rest_nw_id.NW_ID_UNKNOWN,
+        ]
         return mock.patch.dict('sys.modules',
                                {'ryu': ryu_mod,
                                 'ryu.app': ryu_app_mod,
diff --git a/quantum/tests/unit/ryu/utils.py b/quantum/tests/unit/ryu/utils.py
deleted file mode 100644 (file)
index eaa5541..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import mock
-
-
-def patch_fake_ryu_client():
-    ryu_mod = mock.Mock()
-    ryu_app_mod = ryu_mod.app
-    ryu_app_client = ryu_app_mod.client
-    rest_nw_id = ryu_app_mod.rest_nw_id
-    rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__'
-    rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__'
-    return mock.patch.dict('sys.modules',
-                           {'ryu': ryu_mod,
-                            'ryu.app': ryu_app_mod,
-                            'ryu.app.client': ryu_app_client,
-                            'ryu.app.rest_nw_id': rest_nw_id})