]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Improve data access method of ryu-agent
authorYoshihiro Kaneko <ykaneko0929@gmail.com>
Wed, 30 Jan 2013 06:50:41 +0000 (15:50 +0900)
committerYoshihiro Kaneko <ykaneko0929@gmail.com>
Fri, 1 Feb 2013 07:57:06 +0000 (16:57 +0900)
fixes bug #1110174

This patch implement rpc in ryu-agent, instead of accessing a
database directly.
Because it was not necessary to transmit information via database,
therefore the table is eliminated.

Also, I remove openflow controller stuff from a configuration file
of the Ryu plugin because it was not used anymore.

Change-Id: I5e261297c3f92c6a1ac5df229084176e84694e87

etc/quantum/plugins/ryu/ryu.ini
quantum/db/migration/alembic_migrations/versions/49332180ca96_ryu_plugin_update.py [new file with mode: 0644]
quantum/plugins/ryu/agent/ryu_quantum_agent.py
quantum/plugins/ryu/common/config.py
quantum/plugins/ryu/db/api_v2.py
quantum/plugins/ryu/db/models_v2.py
quantum/plugins/ryu/ofp_service_type.py [deleted file]
quantum/plugins/ryu/ryu_quantum_plugin.py
quantum/tests/unit/ryu/test_defaults.py
quantum/tests/unit/ryu/test_ryu_db.py

index 2094ccb2521e22bc6059cccb7b8408681867a59b..3d687c7bc50756c7834bd15122a3b49affed3641 100644 (file)
@@ -16,9 +16,7 @@ sql_connection = sqlite://
 [OVS]
 integration_bridge = br-int
 
-# openflow_controller = <host IP address of ofp controller>:<port: 6633>
 # openflow_rest_api = <host IP address of ofp rest api service>:<port: 8080>
-openflow_controller = 127.0.0.1:6633
 openflow_rest_api = 127.0.0.1:8080
 
 # tunnel key range: 0 < tunnel_key_min < tunnel_key_max
diff --git a/quantum/db/migration/alembic_migrations/versions/49332180ca96_ryu_plugin_update.py b/quantum/db/migration/alembic_migrations/versions/49332180ca96_ryu_plugin_update.py
new file mode 100644 (file)
index 0000000..dd17c25
--- /dev/null
@@ -0,0 +1,59 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack LLC
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""ryu plugin update
+
+Revision ID: 49332180ca96
+Revises: 1149d7de0cfa
+Create Date: 2013-01-30 07:52:58.472885
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '49332180ca96'
+down_revision = '1149d7de0cfa'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+    'quantum.plugins.ryu.ryu_quantum_plugin.RyuQuantumPluginV2'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from quantum.db import migration
+
+
+def upgrade(active_plugin=None, options=None):
+    if not migration.should_run(active_plugin, migration_for_plugins):
+        return
+
+    op.drop_table('ofp_server')
+
+
+def downgrade(active_plugin=None, options=None):
+    if not migration.should_run(active_plugin, migration_for_plugins):
+        return
+
+    op.create_table(
+        'ofp_server',
+        sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
+        sa.Column('address', sa.String(length=255)),
+        sa.Column('host_type', sa.String(length=255)),
+        sa.PrimaryKeyConstraint(u'id')
+    )
index e20dba9ca169adca2bce746a6bec720fbe2b2cb4..f68d6384f59fb0bf2c6b341e154edc83922986e9 100755 (executable)
@@ -28,11 +28,14 @@ import netifaces
 from ryu.app import client
 from ryu.app import conf_switch_key
 from ryu.app import rest_nw_id
-from sqlalchemy.ext.sqlsoup import SqlSoup
 
 from quantum.agent.linux import ovs_lib
 from quantum.agent.linux.ovs_lib import VifPort
+from quantum.agent import rpc as agent_rpc
 from quantum.common import config as logging_config
+from quantum.common import exceptions as q_exc
+from quantum.common import topics
+from quantum import q_context
 from quantum.openstack.common import cfg
 from quantum.openstack.common.cfg import NoSuchGroupError
 from quantum.openstack.common.cfg import NoSuchOptError
@@ -40,7 +43,6 @@ from quantum.openstack.common import log
 from quantum.plugins.ryu.common import config
 
 
-cfg.CONF.import_opt('sql_connection', 'quantum.db.api', 'DATABASE')
 LOG = log.getLogger(__name__)
 
 
@@ -148,29 +150,42 @@ class VifPortSet(object):
                                  port.switch.datapath_id, port.ofport)
 
 
+class RyuPluginApi(agent_rpc.PluginApi):
+    def get_ofp_rest_api_addr(self, context):
+        LOG.debug(_("Get Ryu rest API address"))
+        return self.call(context,
+                         self.make_msg('get_ofp_rest_api'),
+                         topic=self.topic)
+
+
 class OVSQuantumOFPRyuAgent(object):
-    def __init__(self, integ_br, ofp_rest_api_addr,
-                 tunnel_ip, ovsdb_ip, ovsdb_port,
+    def __init__(self, integ_br, tunnel_ip, ovsdb_ip, ovsdb_port,
                  root_helper):
         super(OVSQuantumOFPRyuAgent, self).__init__()
-        self.int_br = None
-        self.vif_ports = None
-        self._setup_integration_br(root_helper, integ_br,
-                                   ofp_rest_api_addr,
-                                   tunnel_ip, ovsdb_port, ovsdb_ip)
+        self._setup_rpc()
+        self._setup_integration_br(root_helper, integ_br, tunnel_ip,
+                                   ovsdb_port, ovsdb_ip)
+
+    def _setup_rpc(self):
+        self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
+        self.context = q_context.get_admin_context_without_session()
 
     def _setup_integration_br(self, root_helper, integ_br,
-                              ofp_rest_api_addr,
                               tunnel_ip, ovsdb_port, ovsdb_ip):
         self.int_br = OVSBridge(integ_br, root_helper)
         self.int_br.find_datapath_id()
 
-        ryu_rest_client = client.OFPClient(ofp_rest_api_addr)
+        rest_api_addr = self.plugin_rpc.get_ofp_rest_api_addr(self.context)
+        if not rest_api_addr:
+            raise q_exc.Invalid(_("Ryu rest API port isn't specified"))
+        LOG.debug(_("Going to ofp controller mode %s"), rest_api_addr)
+
+        ryu_rest_client = client.OFPClient(rest_api_addr)
 
         self.vif_ports = VifPortSet(self.int_br, ryu_rest_client)
         self.vif_ports.setup()
 
-        sc_client = client.SwitchConfClient(ofp_rest_api_addr)
+        sc_client = client.SwitchConfClient(rest_api_addr)
         sc_client.set_key(self.int_br.datapath_id,
                           conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip)
 
@@ -180,31 +195,6 @@ class OVSQuantumOFPRyuAgent(object):
                           'tcp:%s:%d' % (ovsdb_ip, ovsdb_port))
 
 
-def check_ofp_rest_api_addr(db):
-    LOG.debug(_("Checking db"))
-
-    servers = db.ofp_server.all()
-
-    ofp_controller_addr = None
-    ofp_rest_api_addr = None
-    for serv in servers:
-        if serv.host_type == "REST_API":
-            ofp_rest_api_addr = serv.address
-        elif serv.host_type == "controller":
-            ofp_controller_addr = serv.address
-        else:
-            LOG.warn(_("Ignoring unknown server type %s"), serv)
-
-    LOG.debug(_("API %s"), ofp_rest_api_addr)
-    if ofp_controller_addr:
-        LOG.warn(_('OF controller parameter is stale %s'), ofp_controller_addr)
-    if not ofp_rest_api_addr:
-        raise RuntimeError(_("Ryu rest API port isn't specified"))
-
-    LOG.debug(_("Going to ofp controller mode %s"), ofp_rest_api_addr)
-    return ofp_rest_api_addr
-
-
 def main():
     cfg.CONF(project='quantum')
 
@@ -212,13 +202,6 @@ def main():
 
     integ_br = cfg.CONF.OVS.integration_bridge
     root_helper = cfg.CONF.AGENT.root_helper
-    options = {"sql_connection": cfg.CONF.DATABASE.sql_connection}
-    db = SqlSoup(options["sql_connection"])
-
-    LOG.info(_("Connecting to database \"%(database)s\" on %(host)s"),
-             {"database": db.engine.url.database,
-              "host": db.engine.url.host})
-    ofp_rest_api_addr = check_ofp_rest_api_addr(db)
 
     tunnel_ip = _get_tunnel_ip()
     LOG.debug(_('tunnel_ip %s'), tunnel_ip)
@@ -227,8 +210,8 @@ def main():
     ovsdb_ip = _get_ovsdb_ip()
     LOG.debug(_('ovsdb_ip %s'), ovsdb_ip)
     try:
-        OVSQuantumOFPRyuAgent(integ_br, ofp_rest_api_addr,
-                              tunnel_ip, ovsdb_ip, ovsdb_port, root_helper)
+        OVSQuantumOFPRyuAgent(integ_br, tunnel_ip, ovsdb_ip, ovsdb_port,
+                              root_helper)
     except httplib.HTTPException, e:
         LOG.error(_("Initialization failed: %s"), e)
         sys.exit(1)
index c8b75b4ec96e496089c534b96bd9e35ad08551c5..02c1ea462a3c670f91180ab4e92f84c893406135 100644 (file)
@@ -20,8 +20,6 @@ from quantum.openstack.common import cfg
 ovs_opts = [
     cfg.StrOpt('integration_bridge', default='br-int',
                help=_("Integration bridge to use")),
-    cfg.StrOpt('openflow_controller', default='127.0.0.1:6633',
-               help=_("OpenFlow controller to connect to")),
     cfg.StrOpt('openflow_rest_api', default='127.0.0.1:8080',
                help=_("OpenFlow REST API location")),
     cfg.IntOpt('tunnel_key_min', default=1,
index afffd8c7397dfb61614016cb3f9378db2dd6b38f..4ebd27eece3de6adf8a15acaafccd9ac1a4cff9c 100644 (file)
@@ -28,16 +28,6 @@ from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
 LOG = logging.getLogger(__name__)
 
 
-def set_ofp_servers(hosts):
-    session = db.get_session()
-    session.query(ryu_models_v2.OFPServer).delete()
-    for (host_address, host_type) in hosts:
-        host = ryu_models_v2.OFPServer(address=host_address,
-                                       host_type=host_type)
-        session.add(host)
-    session.flush()
-
-
 def network_all_tenant_list():
     session = db.get_session()
     return session.query(models_v2.Network).all()
index 1f335e54549ade70a27eb6d709125e99d1f53fb8..079350da3c85f4fb177671c861c37649d65afa95 100644 (file)
@@ -19,20 +19,6 @@ import sqlalchemy as sa
 from quantum.db import model_base
 
 
-class OFPServer(model_base.BASEV2):
-    """Openflow Server/API address."""
-    __tablename__ = 'ofp_server'
-
-    id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
-    address = sa.Column(sa.String(64))        # netloc <host ip address>:<port>
-    host_type = sa.Column(sa.String(255))     # server type
-                                              # Controller, REST_API
-
-    def __repr__(self):
-        return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
-                                          self.host_type)
-
-
 class TunnelKeyLast(model_base.BASEV2):
     """Lastly allocated Tunnel key. The next key allocation will be started
     from this value + 1
diff --git a/quantum/plugins/ryu/ofp_service_type.py b/quantum/plugins/ryu/ofp_service_type.py
deleted file mode 100644 (file)
index 86615ec..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2012 Isaku Yamahata <yamahata at valinux co jp>
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-# @author: Isaku Yamahata
-
-CONTROLLER = 'controller'
-REST_API = 'REST_API'
index 55460a01a170895bf38ac2ca02c648926b14315f..fbb48f83517f6354c22b7364b497af0b615f6e11 100644 (file)
@@ -34,7 +34,6 @@ from quantum.openstack.common import log as logging
 from quantum.openstack.common import rpc
 from quantum.plugins.ryu.common import config
 from quantum.plugins.ryu.db import api_v2 as db_api_v2
-from quantum.plugins.ryu import ofp_service_type
 
 
 LOG = logging.getLogger(__name__)
@@ -45,9 +44,16 @@ class RyuRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
 
     RPC_API_VERSION = '1.0'
 
+    def __init__(self, ofp_rest_api_addr):
+        self.ofp_rest_api_addr = ofp_rest_api_addr
+
     def create_rpc_dispatcher(self):
         return q_rpc.PluginRpcDispatcher([self])
 
+    def get_ofp_rest_api(self, context, **kwargs):
+        LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
+        return self.ofp_rest_api_addr
+
 
 class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                          l3_db.L3_NAT_db_mixin):
@@ -59,19 +65,13 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
 
         self.tunnel_key = db_api_v2.TunnelKey(
             cfg.CONF.OVS.tunnel_key_min, cfg.CONF.OVS.tunnel_key_max)
-        ofp_con_host = cfg.CONF.OVS.openflow_controller
-        ofp_api_host = cfg.CONF.OVS.openflow_rest_api
-
-        if ofp_con_host is None or ofp_api_host is None:
+        self.ofp_api_host = cfg.CONF.OVS.openflow_rest_api
+        if not self.ofp_api_host:
             raise q_exc.Invalid(_('Invalid configuration. check ryu.ini'))
 
-        hosts = [(ofp_con_host, ofp_service_type.CONTROLLER),
-                 (ofp_api_host, ofp_service_type.REST_API)]
-        db_api_v2.set_ofp_servers(hosts)
-
-        self.client = client.OFPClient(ofp_api_host)
-        self.tun_client = client.TunnelClient(ofp_api_host)
-        self.iface_client = client.QuantumIfaceClient(ofp_api_host)
+        self.client = client.OFPClient(self.ofp_api_host)
+        self.tun_client = client.TunnelClient(self.ofp_api_host)
+        self.iface_client = client.QuantumIfaceClient(self.ofp_api_host)
         for nw_id in rest_nw_id.RESERVED_NETWORK_IDS:
             if nw_id != rest_nw_id.NW_ID_UNKNOWN:
                 self.client.update_network(nw_id)
@@ -82,7 +82,7 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
 
     def _setup_rpc(self):
         self.conn = rpc.create_connection(new=True)
-        self.callbacks = RyuRpcCallbacks()
+        self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(topics.PLUGIN, self.dispatcher, fanout=False)
         self.conn.consume_in_thread()
index 29a34f715240f43788f327572420bc405d72239b..5b203eefd34c443503f665144bf5e15fd2a59c83 100644 (file)
@@ -31,5 +31,4 @@ class ConfigurationTest(unittest2.TestCase):
         self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval)
         self.assertEqual(2, cfg.CONF.AGENT.polling_interval)
         self.assertEqual('sudo', cfg.CONF.AGENT.root_helper)
-        self.assertEqual('127.0.0.1:6633', cfg.CONF.OVS.openflow_controller)
         self.assertEqual('127.0.0.1:8080', cfg.CONF.OVS.openflow_rest_api)
index 6c3dad93c1000e1976e91734c539554a5a369e2f..c93a8b15f9f5bda6eb6b71c67400e1863bca58f7 100644 (file)
@@ -24,27 +24,10 @@ from quantum.openstack.common import cfg
 from quantum.plugins.ryu.common import config
 from quantum.plugins.ryu.db import api_v2 as db_api_v2
 from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
-from quantum.plugins.ryu import ofp_service_type
 from quantum.tests.unit import test_db_plugin as test_plugin
 
 
 class RyuDBTest(test_plugin.QuantumDbPluginV2TestCase):
-    def setUp(self):
-        super(RyuDBTest, self).setUp()
-        self.hosts = [(cfg.CONF.OVS.openflow_controller,
-                       ofp_service_type.CONTROLLER),
-                      (cfg.CONF.OVS.openflow_rest_api,
-                       ofp_service_type.REST_API)]
-        db_api_v2.set_ofp_servers(self.hosts)
-
-    def test_ofp_server(self):
-        session = db.get_session()
-        servers = session.query(ryu_models_v2.OFPServer).all()
-        print servers
-        self.assertEqual(len(servers), 2)
-        for s in servers:
-            self.assertTrue((s.address, s.host_type) in self.hosts)
-
     @staticmethod
     def _tunnel_key_sort(key_list):
         key_list.sort(key=operator.attrgetter('tunnel_key'))