import logging as LOG
from optparse import OptionParser
-import shlex
-import signal
-from subprocess import PIPE, Popen
import sys
import time
from ryu.app.client import OFPClient
from sqlalchemy.ext.sqlsoup import SqlSoup
-from quantum.agent.linux import utils
+from quantum.agent.linux import ovs_lib
+from quantum.agent.linux.ovs_lib import VifPort
from quantum.plugins.ryu.common import config
OP_STATUS_UP = "UP"
OP_STATUS_DOWN = "DOWN"
-class VifPort:
- """
- A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
- attributes set).
- """
- def __init__(self, port_name, ofport, vif_id, vif_mac, switch):
- self.port_name = port_name
- self.ofport = ofport
- self.vif_id = vif_id
- self.vif_mac = vif_mac
- self.switch = switch
-
- def __str__(self):
- return ("iface-id=%s, vif_mac=%s, port_name=%s, ofport=%s, "
- "bridge name = %s" % (self.vif_id,
- self.vif_mac,
- self.port_name,
- self.ofport,
- self.switch.br_name))
-
-
-class OVSBridge:
+class OVSBridge(ovs_lib.OVSBridge):
def __init__(self, br_name, root_helper):
- self.br_name = br_name
- self.root_helper = root_helper
+ ovs_lib.OVSBridge.__init__(self, br_name, root_helper)
self.datapath_id = None
def find_datapath_id(self):
dp_id = res.strip().strip('"')
self.datapath_id = dp_id
- def run_vsctl(self, args):
- full_args = ["ovs-vsctl", "--timeout=2"] + args
- return utils.execute(full_args, root_helper=self.root_helper)
-
def set_controller(self, target):
methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix")
args = target.split(":")
target = "tcp:" + target
self.run_vsctl(["set-controller", self.br_name, target])
- def db_get_map(self, table, record, column):
- str_ = self.run_vsctl(["get", table, record, column]).rstrip("\n\r")
- return self.db_str_to_map(str_)
-
- def db_get_val(self, table, record, column):
- return self.run_vsctl(["get", table, record, column]).rstrip("\n\r")
-
- @staticmethod
- def db_str_to_map(full_str):
- list = full_str.strip("{}").split(", ")
- ret = {}
- for elem in list:
- if elem.find("=") == -1:
- continue
- arr = elem.split("=")
- ret[arr[0]] = arr[1].strip("\"")
- return ret
-
- def get_port_name_list(self):
- res = self.run_vsctl(["list-ports", self.br_name])
- return res.split("\n")[:-1]
-
- def get_xapi_iface_id(self, xs_vif_uuid):
- return utils.execute(["xe", "vif-param-get",
- "param-name=other-config",
- "param-key=nicira-iface-id",
- "uuid=%s" % xs_vif_uuid],
- root_helper=self.root_helper).strip()
-
def _vifport(self, name, external_ids):
ofport = self.db_get_val("Interface", name, "ofport")
return VifPort(name, ofport, external_ids["iface-id"],
class OVSQuantumOFPRyuAgent:
- def __init__(self, integ_br, db, root_helper):
+ def __init__(self, integ_br, db, root_helper, target_v2_api=False):
self.root_helper = root_helper
(ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)
self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL
self.api = OFPClient(ofp_rest_api_addr)
+ self.target_v2_api = target_v2_api
self._setup_integration_br(integ_br, ofp_controller_addr)
def _setup_integration_br(self, integ_br, ofp_controller_addr):
def _all_bindings(self, db):
"""return interface id -> port which include network id bindings"""
- return dict((port.interface_id, port) for port in db.ports.all())
+ if self.target_v2_api:
+ return dict((port.device_id, port) for port in db.ports.all())
+ else:
+ return dict((port.interface_id, port) for port in db.ports.all())
+
+ def _set_port_status(self, port, status):
+ if self.target_v2_api:
+ port.status = status
+ else:
+ port.op_status = status
def daemon_loop(self, db):
# on startup, register all existing ports
net_id = all_bindings[port.vif_id].network_id
local_bindings[port.vif_id] = net_id
self._port_update(net_id, port)
- all_bindings[port.vif_id].op_status = OP_STATUS_UP
+ self._set_port_status(all_bindings[port.vif_id], OP_STATUS_UP)
LOG.info("Updating binding to net-id = %s for %s",
net_id, str(port))
db.commit()
LOG.info("Removing binding to net-id = %s for %s",
old_b, str(port))
if port.vif_id in all_bindings:
- all_bindings[port.vif_id].op_status = OP_STATUS_DOWN
+ self._set_port_status(all_bindings[port.vif_id],
+ OP_STATUS_DOWN)
if not new_b:
if port.vif_id in all_bindings:
- all_bindings[port.vif_id].op_status = OP_STATUS_UP
+ self._set_port_status(all_bindings[port.vif_id],
+ OP_STATUS_UP)
LOG.info("Adding binding to net-id = %s for %s",
new_b, str(port))
if vif_id not in new_vif_ports:
LOG.info("Port Disappeared: %s", vif_id)
if vif_id in all_bindings:
- all_bindings[vif_id].op_status = OP_STATUS_DOWN
+ self._set_port_status(all_bindings[port.vif_id],
+ OP_STATUS_DOWN)
old_vif_ports = new_vif_ports
old_local_bindings = new_local_bindings
conf = config.parse(config_file)
integ_br = conf.OVS.integration_bridge
root_helper = conf.AGENT.root_helper
+ target_v2_api = conf.AGENT.target_v2_api
options = {"sql_connection": conf.DATABASE.sql_connection}
db = SqlSoup(options["sql_connection"])
LOG.info("Connecting to database \"%s\" on %s",
db.engine.url.database, db.engine.url.host)
- plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper)
+ plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper, target_v2_api)
plugin.daemon_loop(db)
sys.exit(0)
]
agent_opts = [
+ cfg.BoolOpt('target_v2_api', default=True),
cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'),
]
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import quantum.db.api as db
+from quantum.db.models_v2 import Network
+from quantum.plugins.ryu.db import models_v2
+
+
+def set_ofp_servers(hosts):
+ session = db.get_session()
+ session.query(models_v2.OFPServer).delete()
+ for (host_address, host_type) in hosts:
+ host = models_v2.OFPServer(host_address, host_type)
+ session.add(host)
+ session.flush()
+
+
+def network_all_tenant_list():
+ session = db.get_session()
+ return session.query(Network).all()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Column, Integer, String
+
+from quantum.db import models_v2
+
+
+class OFPServer(models_v2.model_base.BASEV2):
+ """Openflow Server/API address"""
+ __tablename__ = 'ofp_server'
+
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ address = Column(String(255)) # netloc <host ip address>:<port>
+ host_type = Column(String(255)) # server type
+ # Controller, REST_API
+
+ def __init__(self, address, host_type):
+ self.address = address
+ self.host_type = host_type
+
+ def __repr__(self):
+ return "<OFPServer(%s,%s,%s)>" % (self.id, self.address,
+ self.host_type)
dev = self.get_dev_name(iface_id)
return _get_port_no(dev)
- def plug(self, instance, network, mapping):
+ def plug(self, instance, vif):
result = super(LibvirtOpenVswitchOFPRyuDriver, self).plug(
- instance, network, mapping)
+ instance, vif)
+ network, mapping = vif
port_no = self._get_port_no(mapping)
- self.ryu_client.create_port(network['id'], self.datapath_id, port_no)
+ try:
+ self.ryu_client.create_port(network['id'], self.datapath_id,
+ port_no)
+ except httplib.HTTPException as e:
+ res = e.args[0]
+ if res.status != httplib.CONFLICT:
+ raise
return result
- def unplug(self, instance, network, mapping):
+ def unplug(self, instance, vif):
+ network, mapping = vif
port_no = self._get_port_no(mapping)
try:
self.ryu_client.delete_port(network['id'],
res = e.args[0]
if res.status != httplib.NOT_FOUND:
raise
- super(LibvirtOpenVswitchOFPRyuDriver, self).unplug(instance, network,
- mapping)
+ super(LibvirtOpenVswitchOFPRyuDriver, self).unplug(instance, vif)
# under the License.
# @author: Isaku Yamahata
+import logging
+import os
+
from ryu.app import client
from ryu.app import rest_nw_id
from quantum.common import exceptions as q_exc
from quantum.common.utils import find_config_file
-import quantum.db.api as db
+from quantum.db import api as db
+from quantum.db import db_base_plugin_v2
+from quantum.db import models_v2
from quantum.plugins.ryu.db import api as db_api
+from quantum.plugins.ryu.db import api_v2 as db_api_v2
from quantum.plugins.ryu import ofp_service_type
from quantum.plugins.ryu import ovs_quantum_plugin_base
+from quantum.plugins.ryu.common import config
-
+LOG = logging.getLogger(__name__)
CONF_FILE = find_config_file({"plugin": "ryu"}, "ryu.ini")
def __init__(self, configfile=None):
super(RyuQuantumPlugin, self).__init__(CONF_FILE, __file__, configfile)
self.driver = OFPRyuDriver(self.conf)
+
+
+class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
+ def __init__(self, configfile=None):
+ if configfile is None:
+ if os.path.exists(CONF_FILE):
+ configfile = CONF_FILE
+ if configfile is None:
+ raise Exception("Configuration file \"%s\" doesn't exist" %
+ (configfile))
+ LOG.debug("Using configuration file: %s" % configfile)
+ conf = config.parse(configfile)
+ options = {"sql_connection": conf.DATABASE.sql_connection}
+ options.update({'base': models_v2.model_base.BASEV2})
+ reconnect_interval = conf.DATABASE.reconnect_interval
+ options.update({"reconnect_interval": reconnect_interval})
+ db.configure_db(options)
+
+ ofp_con_host = conf.OVS.openflow_controller
+ ofp_api_host = conf.OVS.openflow_rest_api
+
+ if ofp_con_host is None or ofp_api_host is None:
+ raise q_exc.Invalid("invalid configuration. check ryu.ini")
+
+ hosts = [(ofp_con_host, ofp_service_type.CONTROLLER),
+ (ofp_api_host, ofp_service_type.REST_API)]
+ db_api_v2.set_ofp_servers(hosts)
+
+ self.client = client.OFPClient(ofp_api_host)
+ self.client.update_network(rest_nw_id.NW_ID_EXTERNAL)
+
+ # register known all network list on startup
+ self._create_all_tenant_network()
+
+ def _create_all_tenant_network(self):
+ networks = db_api_v2.network_all_tenant_list()
+ for net in networks:
+ self.client.update_network(net.id)
+
+ def create_network(self, context, network):
+ net = super(RyuQuantumPluginV2, self).create_network(context, network)
+ self.client.create_network(net['id'])
+ return net
+
+ def delete_network(self, context, id):
+ self.client.delete_network(id)
+ return super(RyuQuantumPluginV2, self).delete_network(context, id)