openflow_controller = 127.0.0.1:6633
openflow_rest_api = 127.0.0.1:8080
+# tunnel key range: 0 < tunnel_key_min < tunnel_key_max
+# VLAN: 12bits, GRE, VXLAN: 24bits
+# tunnel_key_min = 1
+# tunnel_key_max = 0xffffff
+
[AGENT]
# Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
# root filter facility.
# under the License.
# @author: Isaku Yamahata
-import logging as LOG
import sys
import time
from quantum.common import config as logging_config
from quantum.common import constants
from quantum.openstack.common import cfg
+from quantum.openstack.common import log as LOG
from quantum.plugins.ryu.common import config
cfg.StrOpt('integration_bridge', default='br-int'),
cfg.StrOpt('openflow_controller', default='127.0.0.1:6633'),
cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080'),
+ cfg.IntOpt('tunnel_key_min', default=1),
+ cfg.IntOpt('tunnel_key_max', default=0xffffff)
]
agent_opts = [
# License for the specific language governing permissions and limitations
# under the License.
+from sqlalchemy import exc as sa_exc
+from sqlalchemy import func
+from sqlalchemy.orm import exc as orm_exc
+
+from quantum.common import exceptions as q_exc
import quantum.db.api as db
-from quantum.db.models_v2 import Network
-from quantum.plugins.ryu.db import models_v2
+from quantum.db import models_v2
+from quantum.openstack.common import log as logging
+from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
+
+
+LOG = logging.getLogger(__name__)
def set_ofp_servers(hosts):
session = db.get_session()
- session.query(models_v2.OFPServer).delete()
+ session.query(ryu_models_v2.OFPServer).delete()
for (host_address, host_type) in hosts:
- host = models_v2.OFPServer(host_address, host_type)
+ host = ryu_models_v2.OFPServer(address=host_address,
+ host_type=host_type)
session.add(host)
session.flush()
def network_all_tenant_list():
session = db.get_session()
- return session.query(Network).all()
+ return session.query(models_v2.Network).all()
+
+
+class TunnelKey(object):
+ # VLAN: 12 bits
+ # GRE, VXLAN: 24bits
+ # TODO(yamahata): STT: 64bits
+ _KEY_MIN_HARD = 1
+ _KEY_MAX_HARD = 0xffffffff
+
+ def __init__(self, key_min=_KEY_MIN_HARD, key_max=_KEY_MAX_HARD):
+ self.key_min = key_min
+ self.key_max = key_max
+
+ if (key_min < self._KEY_MIN_HARD or key_max > self._KEY_MAX_HARD or
+ key_min > key_max):
+ raise ValueError('Invalid tunnel key options '
+ 'tunnel_key_min: %d tunnel_key_max: %d. '
+ 'Using default value' % (key_min, key_min))
+
+ def _last_key(self, session):
+ try:
+ return session.query(ryu_models_v2.TunnelKeyLast).one()
+ except orm_exc.MultipleResultsFound:
+ max_key = session.query(
+ func.max(ryu_models_v2.TunnelKeyLast.last_key))
+ if max_key > self.key_max:
+ max_key = self.key_min
+
+ session.query(ryu_models_v2.TunnelKeyLast).delete()
+ last_key = ryu_models_v2.TunnelKeyLast(last_key=max_key)
+ except orm_exc.NoResultFound:
+ last_key = ryu_models_v2.TunnelKeyLast(last_key=self.key_min)
+
+ session.add(last_key)
+ session.flush()
+ return session.query(ryu_models_v2.TunnelKeyLast).one()
+
+ def _find_key(self, session, last_key):
+ """
+ Try to find unused tunnel key in TunnelKey table starting
+ from last_key + 1.
+ When all keys are used, raise sqlalchemy.orm.exc.NoResultFound
+ """
+ # key 0 is used for special meanings. So don't allocate 0.
+
+ # sqlite doesn't support
+ # '(select order by limit) union all (select order by limit) '
+ # 'order by limit'
+ # So do it manually
+ # new_key = session.query("new_key").from_statement(
+ # # If last_key + 1 isn't used, it's the result
+ # 'SELECT new_key '
+ # 'FROM (SELECT :last_key + 1 AS new_key) q1 '
+ # 'WHERE NOT EXISTS '
+ # '(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) '
+ #
+ # 'UNION ALL '
+ #
+ # # if last_key + 1 used,
+ # # find the least unused key from last_key + 1
+ # '(SELECT t.tunnel_key + 1 AS new_key '
+ # 'FROM tunnelkeys t '
+ # 'WHERE NOT EXISTS '
+ # '(SELECT 1 FROM tunnelkeys ti '
+ # ' WHERE ti.tunnel_key = t.tunnel_key + 1) '
+ # 'AND t.tunnel_key >= :last_key '
+ # 'ORDER BY new_key LIMIT 1) '
+ #
+ # 'ORDER BY new_key LIMIT 1'
+ # ).params(last_key=last_key).one()
+ try:
+ new_key = session.query("new_key").from_statement(
+ # If last_key + 1 isn't used, it's the result
+ 'SELECT new_key '
+ 'FROM (SELECT :last_key + 1 AS new_key) q1 '
+ 'WHERE NOT EXISTS '
+ '(SELECT 1 FROM tunnelkeys WHERE tunnel_key = :last_key + 1) '
+ ).params(last_key=last_key).one()
+ except orm_exc.NoResultFound:
+ new_key = session.query("new_key").from_statement(
+ # if last_key + 1 used,
+ # find the least unused key from last_key + 1
+ '(SELECT t.tunnel_key + 1 AS new_key '
+ 'FROM tunnelkeys t '
+ 'WHERE NOT EXISTS '
+ '(SELECT 1 FROM tunnelkeys ti '
+ ' WHERE ti.tunnel_key = t.tunnel_key + 1) '
+ 'AND t.tunnel_key >= :last_key '
+ 'ORDER BY new_key LIMIT 1) '
+ ).params(last_key=last_key).one()
+
+ new_key = new_key[0] # the result is tuple.
+ LOG.debug("last_key %s new_key %s", last_key, new_key)
+ if new_key > self.key_max:
+ LOG.debug("no key found")
+ raise orm_exc.NoResultFound()
+ return new_key
+
+ def _allocate(self, session, network_id):
+ last_key = self._last_key(session)
+ try:
+ new_key = self._find_key(session, last_key.last_key)
+ except orm_exc.NoResultFound:
+ new_key = self._find_key(session, self.key_min)
+
+ tunnel_key = ryu_models_v2.TunnelKey(network_id=network_id,
+ tunnel_key=new_key)
+ last_key.last_key = new_key
+ session.add(tunnel_key)
+ return new_key
+
+ _TRANSACTION_RETRY_MAX = 16
+
+ def allocate(self, session, network_id):
+ count = 0
+ while True:
+ session.begin(subtransactions=True)
+ try:
+ new_key = self._allocate(session, network_id)
+ session.commit()
+ break
+ except sa_exc.SQLAlchemyError:
+ session.rollback()
+
+ count += 1
+ if count > self._TRANSACTION_RETRY_MAX:
+ # if this happens too often, increase _TRANSACTION_RETRY_MAX
+ LOG.warn("Transaction retry reaches to %d. "
+ "abandan to allocate tunnel key." % count)
+ raise q_exc.ResourceExhausted()
+
+ return new_key
+
+ def delete(self, session, network_id):
+ session.query(ryu_models_v2.TunnelKey).filter_by(
+ network_id=network_id).delete()
+ session.flush()
+
+ def all_list(self):
+ session = db.get_session()
+ return session.query(ryu_models_v2.TunnelKey).all()
# License for the specific language governing permissions and limitations
# under the License.
-from sqlalchemy import Column, Integer, String
+import sqlalchemy as sa
+from quantum.db import model_base
from quantum.db import models_v2
-class OFPServer(models_v2.model_base.BASEV2):
- """Openflow Server/API address"""
+class OFPServer(model_base.BASEV2):
+ """Openflow Server/API address."""
__tablename__ = 'ofp_server'
- id = Column(Integer, primary_key=True, autoincrement=True)
- address = Column(String(255)) # netloc <host ip address>:<port>
- host_type = Column(String(255)) # server type
- # Controller, REST_API
-
- def __init__(self, address, host_type):
- self.address = address
- self.host_type = host_type
+ id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
+ address = sa.Column(sa.String(64)) # netloc <host ip address>:<port>
+ host_type = sa.Column(sa.String(255)) # server type
+ # Controller, REST_API
def __repr__(self):
return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
self.host_type)
+
+
+class TunnelKeyLast(model_base.BASEV2):
+ """Lastly allocated Tunnel key. The next key allocation will be started
+ from this value + 1
+ """
+ last_key = sa.Column(sa.Integer, primary_key=True)
+
+ def __repr__(self):
+ return "<TunnelKeyLast(%x)>" % self.last_key
+
+
+class TunnelKey(model_base.BASEV2):
+ """Netowrk ID <-> tunnel key mapping."""
+ network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"),
+ nullable=False)
+ tunnel_key = sa.Column(sa.Integer, primary_key=True,
+ nullable=False, autoincrement=False)
+
+ def __repr__(self):
+ return "<TunnelKey(%s,%x)>" % (self.network_id, self.tunnel_key)
# under the License.
# @author: Isaku Yamahata
-import logging
-
from ryu.app import client
from ryu.app import rest_nw_id
+from sqlalchemy.orm import exc as sql_exc
from quantum.common import exceptions as q_exc
from quantum.common import topics
from quantum.db import l3_db
from quantum.db import models_v2
from quantum.openstack.common import cfg
+from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.db import api_v2 as db_api_v2
from quantum.plugins.ryu import ofp_service_type
+
LOG = logging.getLogger(__name__)
options.update({"reconnect_interval": reconnect_interval})
db.configure_db(options)
+ self.tunnel_key = db_api_v2.TunnelKey(
+ cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max)
ofp_con_host = cfg.CONF.OVS.openflow_controller
ofp_api_host = cfg.CONF.OVS.openflow_rest_api
db_api_v2.set_ofp_servers(hosts)
self.client = client.OFPClient(ofp_api_host)
- self.client.update_network(rest_nw_id.NW_ID_EXTERNAL)
+ self.tun_client = client.TunnelClient(ofp_api_host)
+ for nw_id in rest_nw_id.RESERVED_NETWORK_IDS:
+ if nw_id != rest_nw_id.NW_ID_UNKNOWN:
+ self.client.update_network(nw_id)
self._setup_rpc()
# register known all network list on startup
self.conn.consume_in_thread()
def _create_all_tenant_network(self):
- networks = db_api_v2.network_all_tenant_list()
- for net in networks:
+ for net in db_api_v2.network_all_tenant_list():
self.client.update_network(net.id)
+ for tun in self.tunnel_key.all_list():
+ self.tun_client.update_tunnel_key(tun.network_id, tun.tunnel_key)
+
+ def _client_create_network(self, net_id, tunnel_key):
+ self.client.create_network(net_id)
+ self.tun_client.create_tunnel_key(net_id, tunnel_key)
+
+ def _client_delete_network(self, net_id):
+ client.ignore_http_not_found(
+ lambda: self.client.delete_network(net_id))
+ client.ignore_http_not_found(
+ lambda: self.tun_client.delete_tunnel_key(net_id))
def create_network(self, context, network):
session = context.session
with session.begin(subtransactions=True):
net = super(RyuQuantumPluginV2, self).create_network(context,
network)
- self.client.create_network(net['id'])
self._process_l3_create(context, network['network'], net['id'])
self._extend_network_dict_l3(context, net)
+
+ tunnel_key = self.tunnel_key.allocate(session, net['id'])
+ try:
+ self._client_create_network(net['id'], tunnel_key)
+ except:
+ self._client_delete_network(net['id'])
+ raise
+
return net
def update_network(self, context, id, network):
return net
def delete_network(self, context, id):
+ self._client_delete_network(id)
session = context.session
with session.begin(subtransactions=True):
+ self.tunnel_key.delete(session, id)
super(RyuQuantumPluginV2, self).delete_network(context, id)
- self.client.delete_network(id)
def get_network(self, context, id, fields=None):
net = super(RyuQuantumPluginV2, self).get_network(context, id, None)
# License for the specific language governing permissions and limitations
# under the License.
-import os
+import operator
import unittest2
from quantum.db import api as db
self.assertEqual(len(servers), 2)
for s in servers:
self.assertTrue((s.address, s.host_type) in self.hosts)
+
+ @staticmethod
+ def _tunnel_key_sort(key_list):
+ key_list.sort(key=operator.attrgetter('tunnel_key'))
+ return [(key.network_id, key.tunnel_key) for key in key_list]
+
+ def test_key_allocation(self):
+ tunnel_key = db_api_v2.TunnelKey()
+ session = db.get_session()
+ network_id0 = u'network-id-0'
+ key0 = tunnel_key.allocate(session, network_id0)
+ network_id1 = u'network-id-1'
+ key1 = tunnel_key.allocate(session, network_id1)
+ key_list = tunnel_key.all_list()
+ self.assertEqual(len(key_list), 2)
+
+ expected_list = [(network_id0, key0), (network_id1, key1)]
+ self.assertEqual(self._tunnel_key_sort(key_list), expected_list)
+
+ tunnel_key.delete(session, network_id0)
+ key_list = tunnel_key.all_list()
+ self.assertEqual(self._tunnel_key_sort(key_list),
+ [(network_id1, key1)])
ryu_app_client = ryu_app_mod.client
rest_nw_id = ryu_app_mod.rest_nw_id
rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__'
+ rest_nw_id.NW_ID_RESERVED = '__NW_ID_RESERVED__'
+ rest_nw_id.NW_ID_VPORT_GRE = '__NW_ID_VPORT_GRE__'
rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__'
+ rest_nw_id.RESERVED_NETWORK_IDS = [
+ rest_nw_id.NW_ID_EXTERNAL,
+ rest_nw_id.NW_ID_RESERVED,
+ rest_nw_id.NW_ID_VPORT_GRE,
+ rest_nw_id.NW_ID_UNKNOWN,
+ ]
return mock.patch.dict('sys.modules',
{'ryu': ryu_mod,
'ryu.app': ryu_app_mod,
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-
-
-def patch_fake_ryu_client():
- ryu_mod = mock.Mock()
- ryu_app_mod = ryu_mod.app
- ryu_app_client = ryu_app_mod.client
- rest_nw_id = ryu_app_mod.rest_nw_id
- rest_nw_id.NW_ID_EXTERNAL = '__NW_ID_EXTERNAL__'
- rest_nw_id.NW_ID_UNKNOWN = '__NW_ID_UNKNOWN__'
- return mock.patch.dict('sys.modules',
- {'ryu': ryu_mod,
- 'ryu.app': ryu_app_mod,
- 'ryu.app.client': ryu_app_client,
- 'ryu.app.rest_nw_id': rest_nw_id})