]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add native OVSDB implementation of OVSDB API
authorTerry Wilson <twilson@redhat.com>
Thu, 22 Jan 2015 11:39:57 +0000 (05:39 -0600)
committerTerry Wilson <twilson@redhat.com>
Fri, 20 Mar 2015 13:40:35 +0000 (13:40 +0000)
This patch adds a native OVSDB protocol version of the new OVSDB
API. As such, it does not require using sudo/rootwrap but instead
uses the OVS IDL Python library that ships with openvswitch.

Doc-Impact
Implements: blueprint vsctl-to-ovsdb
Change-Id: I2bf5ba7f4607cec324ea841f0fea7bad274faffc

etc/neutron/plugins/openvswitch/ovs_neutron_plugin.ini
neutron/agent/linux/ovs_lib.py
neutron/agent/ovsdb/api.py
neutron/agent/ovsdb/impl_idl.py [new file with mode: 0644]
neutron/agent/ovsdb/native/__init__.py [new file with mode: 0644]
neutron/agent/ovsdb/native/commands.py [new file with mode: 0644]
neutron/agent/ovsdb/native/connection.py [new file with mode: 0644]
neutron/agent/ovsdb/native/idlutils.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/base.py
neutron/tests/functional/agent/test_ovs_lib.py
neutron/tests/unit/agent/linux/test_ovs_lib.py

index 7196b9a9c747b6989e57ff58651720dc95dce8bc..072bd7293b9391aa7c8f0bde7a3d6e9720523c36 100644 (file)
 # so long as it is set to True.
 # use_veth_interconnection = False
 
+# (StrOpt) Which OVSDB backend to use, defaults to 'vsctl'
+# vsctl - The backend based on executing ovs-vsctl
+# native - The backend based on using native OVSDB
+# ovsdb_interface = vsctl
+
+# (StrOpt) The connection string for the native OVSDB backend
+# To enable ovsdb-server to listen on port 6640:
+#   ovs-vsctl set-manager ptcp:6640:127.0.0.1
+# ovsdb_connection = tcp:127.0.0.1:6640
+
 [agent]
 # Agent's polling interval in seconds
 # polling_interval = 2
index 531232388f0104d58b28b21423c1772f0ba6c325..8ab1888bc015b008343bc654419ddbc5b4bebcb1 100644 (file)
@@ -102,7 +102,10 @@ class BaseOVS(object):
 
     def add_bridge(self, bridge_name):
         self.ovsdb.add_br(bridge_name).execute()
-        return OVSBridge(bridge_name)
+        br = OVSBridge(bridge_name)
+        # Don't return until vswitchd sets up the internal port
+        br.get_port_ofport(bridge_name)
+        return br
 
     def delete_bridge(self, bridge_name):
         self.ovsdb.del_br(bridge_name).execute()
@@ -162,6 +165,8 @@ class OVSBridge(BaseOVS):
 
     def create(self):
         self.ovsdb.add_br(self.br_name).execute()
+        # Don't return until vswitchd sets up the internal port
+        self.get_port_ofport(self.br_name)
 
     def destroy(self):
         self.delete_bridge(self.br_name)
@@ -191,6 +196,8 @@ class OVSBridge(BaseOVS):
             if interface_attr_tuples:
                 txn.add(self.ovsdb.db_set('Interface', port_name,
                                           *interface_attr_tuples))
+        # Don't return until the port has been assigned by vswitchd
+        self.get_port_ofport(port_name)
 
     def delete_port(self, port_name):
         self.ovsdb.del_port(port_name, self.br_name).execute()
index 10d365acff223e0ca521529f32bb17a35554dffb..2d6634ae695fa1d51bb131d69661d37c816e2056 100644 (file)
@@ -20,6 +20,7 @@ import six
 
 interface_map = {
     'vsctl': 'neutron.agent.ovsdb.impl_vsctl.OvsdbVsctl',
+    'native': 'neutron.agent.ovsdb.impl_idl.OvsdbIdl',
 }
 
 OPTS = [
diff --git a/neutron/agent/ovsdb/impl_idl.py b/neutron/agent/ovsdb/impl_idl.py
new file mode 100644 (file)
index 0000000..49d9605
--- /dev/null
@@ -0,0 +1,195 @@
+# Copyright (c) 2015 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import Queue
+import time
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_utils import excutils
+from ovs.db import idl
+
+from neutron.agent.ovsdb import api
+from neutron.agent.ovsdb.native import commands as cmd
+from neutron.agent.ovsdb.native import connection
+from neutron.agent.ovsdb.native import idlutils
+from neutron.i18n import _LE
+
+
+OPTS = [
+    cfg.StrOpt('ovsdb_connection',
+               default='tcp:127.0.0.1:6640',
+               help=_('The connection string for the native OVSDB backend')),
+]
+cfg.CONF.register_opts(OPTS, 'OVS')
+# TODO(twilson) DEFAULT.ovs_vsctl_timeout should be OVS.vsctl_timeout
+cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.linux.ovs_lib')
+
+LOG = logging.getLogger(__name__)
+
+
+ovsdb_connection = connection.Connection(cfg.CONF.OVS.ovsdb_connection,
+                                         cfg.CONF.ovs_vsctl_timeout)
+
+
+class Transaction(api.Transaction):
+    def __init__(self, context, api, check_error=False, log_errors=False):
+        self.context = context
+        self.api = api
+        self.check_error = check_error
+        self.log_errors = log_errors
+        self.commands = []
+        self.results = Queue.Queue(1)
+
+    def add(self, command):
+        """Add a command to the transaction
+
+        returns The command passed as a convenience
+        """
+
+        self.commands.append(command)
+        return command
+
+    def commit(self):
+        ovsdb_connection.queue_txn(self)
+        result = self.results.get()
+        if isinstance(result, Exception) and self.check_error:
+            raise result
+        return result
+
+    def do_commit(self):
+        start_time = time.time()
+        attempts = 0
+        while True:
+            elapsed_time = time.time() - start_time
+            if attempts > 0 and elapsed_time > self.context.vsctl_timeout:
+                raise RuntimeError("OVS transaction timed out")
+            attempts += 1
+            # TODO(twilson) Make sure we don't loop longer than vsctl_timeout
+            txn = idl.Transaction(self.api.idl)
+            for i, command in enumerate(self.commands):
+                LOG.debug("Running txn command(idx=%(idx)s): %(cmd)s",
+                          {'idx': i, 'cmd': command})
+                try:
+                    command.run_idl(txn)
+                except Exception:
+                    with excutils.save_and_reraise_exception() as ctx:
+                        txn.abort()
+                        if not self.check_error:
+                            ctx.reraise = False
+            seqno = self.api.idl.change_seqno
+            status = txn.commit_block()
+            if status == txn.TRY_AGAIN:
+                LOG.debug("OVSDB transaction returned TRY_AGAIN, retrying")
+                if self.api.idl._session.rpc.status != 0:
+                    LOG.debug("Lost connection to OVSDB, reconnecting!")
+                    self.api.idl.force_reconnect()
+                idlutils.wait_for_change(
+                    self.api.idl, self.context.vsctl_timeout - elapsed_time,
+                    seqno)
+                continue
+            elif status == txn.ERROR:
+                msg = _LE("OVSDB Error: %s") % txn.get_error()
+                if self.log_errors:
+                    LOG.error(msg)
+                if self.check_error:
+                    # For now, raise similar error to vsctl/utils.execute()
+                    raise RuntimeError(msg)
+                return
+            elif status == txn.ABORTED:
+                LOG.debug("Transaction aborted")
+                return
+            elif status == txn.UNCHANGED:
+                LOG.debug("Transaction caused no change")
+
+            return [cmd.result for cmd in self.commands]
+
+
+class OvsdbIdl(api.API):
+    def __init__(self, context):
+        super(OvsdbIdl, self).__init__(context)
+        ovsdb_connection.start()
+        self.idl = ovsdb_connection.idl
+
+    @property
+    def _tables(self):
+        return self.idl.tables
+
+    @property
+    def _ovs(self):
+        return self._tables['Open_vSwitch'].rows.values()[0]
+
+    def transaction(self, check_error=False, log_errors=True, **kwargs):
+        return Transaction(self.context, self, check_error, log_errors)
+
+    def add_br(self, name, may_exist=True):
+        return cmd.AddBridgeCommand(self, name, may_exist)
+
+    def del_br(self, name, if_exists=True):
+        return cmd.DelBridgeCommand(self, name, if_exists)
+
+    def br_exists(self, name):
+        return cmd.BridgeExistsCommand(self, name)
+
+    def port_to_br(self, name):
+        return cmd.PortToBridgeCommand(self, name)
+
+    def iface_to_br(self, name):
+        # For our purposes, ports and interfaces always have the same name
+        return cmd.PortToBridgeCommand(self, name)
+
+    def list_br(self):
+        return cmd.ListBridgesCommand(self)
+
+    def br_get_external_id(self, name, field):
+        return cmd.BrGetExternalIdCommand(self, name, field)
+
+    def br_set_external_id(self, name, field, value):
+        return cmd.BrSetExternalIdCommand(self, name, field, value)
+
+    def db_set(self, table, record, *col_values):
+        return cmd.DbSetCommand(self, table, record, *col_values)
+
+    def db_clear(self, table, record, column):
+        return cmd.DbClearCommand(self, table, record, column)
+
+    def db_get(self, table, record, column):
+        return cmd.DbGetCommand(self, table, record, column)
+
+    def db_list(self, table, records=None, columns=None, if_exists=False):
+        return cmd.DbListCommand(self, table, records, columns, if_exists)
+
+    def db_find(self, table, *conditions, **kwargs):
+        return cmd.DbFindCommand(self, table, *conditions, **kwargs)
+
+    def set_controller(self, bridge, controllers):
+        return cmd.SetControllerCommand(self, bridge, controllers)
+
+    def del_controller(self, bridge):
+        return cmd.DelControllerCommand(self, bridge)
+
+    def get_controller(self, bridge):
+        return cmd.GetControllerCommand(self, bridge)
+
+    def set_fail_mode(self, bridge, mode):
+        return cmd.SetFailModeCommand(self, bridge, mode)
+
+    def add_port(self, bridge, port, may_exist=True):
+        return cmd.AddPortCommand(self, bridge, port, may_exist)
+
+    def del_port(self, port, bridge=None, if_exists=True):
+        return cmd.DelPortCommand(self, port, bridge, if_exists)
+
+    def list_ports(self, bridge):
+        return cmd.ListPortsCommand(self, bridge)
diff --git a/neutron/agent/ovsdb/native/__init__.py b/neutron/agent/ovsdb/native/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/agent/ovsdb/native/commands.py b/neutron/agent/ovsdb/native/commands.py
new file mode 100644 (file)
index 0000000..5928e12
--- /dev/null
@@ -0,0 +1,406 @@
+# Copyright (c) 2015 Openstack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+
+from oslo_log import log as logging
+from oslo_utils import excutils
+
+from neutron.agent.ovsdb import api
+from neutron.agent.ovsdb.native import idlutils
+from neutron.common import exceptions
+from neutron.i18n import _LE
+
+LOG = logging.getLogger(__name__)
+
+
+class RowNotFound(exceptions.NeutronException):
+    message = _("Table %(table)s has no row with %(col)s=%(match)s")
+
+
+class BaseCommand(api.Command):
+    def __init__(self, api):
+        self.api = api
+        self.result = None
+
+    def execute(self, check_error=False, log_errors=True):
+        try:
+            with self.api.transaction(check_error, log_errors) as txn:
+                txn.add(self)
+            return self.result
+        except Exception:
+            with excutils.save_and_reraise_exception() as ctx:
+                if log_errors:
+                    LOG.exception(_LE("Error executing command"))
+                if not check_error:
+                    ctx.reraise = False
+
+    def row_by_index(self, table, match, *default):
+        tab = self.api._tables[table]
+        idx = idlutils.get_index_column(tab)
+        return self.row_by_value(table, idx, match, *default)
+
+    def row_by_value(self, table, column, match, *default):
+        tab = self.api._tables[table]
+        try:
+            return next(r for r in tab.rows.values()
+                        if getattr(r, column) == match)
+        except StopIteration:
+            if len(default) == 1:
+                return default[0]
+            else:
+                raise RowNotFound(table=table, col=column, match=match)
+
+    def __str__(self):
+        command_info = self.__dict__
+        return "%s(%s)" % (
+            self.__class__.__name__,
+            ", ".join("%s=%s" % (k, v) for k, v in command_info.items()
+                      if k not in ['api', 'result']))
+
+
+class AddBridgeCommand(BaseCommand):
+    def __init__(self, api, name, may_exist):
+        super(AddBridgeCommand, self).__init__(api)
+        self.name = name
+        self.may_exist = may_exist
+
+    def run_idl(self, txn):
+        if self.may_exist:
+            br = self.row_by_value('Bridge', 'name', self.name, None)
+            if br:
+                return
+        row = txn.insert(self.api._tables['Bridge'])
+        row.name = self.name
+        self.api._ovs.verify('bridges')
+        self.api._ovs.bridges = self.api._ovs.bridges + [row]
+
+        # Add the internal bridge port
+        cmd = AddPortCommand(self.api, self.name, self.name, self.may_exist)
+        cmd.run_idl(txn)
+
+        cmd = DbSetCommand(self.api, 'Interface', self.name,
+                           ('type', 'internal'))
+        cmd.run_idl(txn)
+
+
+class DelBridgeCommand(BaseCommand):
+    def __init__(self, api, name, if_exists):
+        super(DelBridgeCommand, self).__init__(api)
+        self.name = name
+        self.if_exists = if_exists
+
+    def run_idl(self, txn):
+        try:
+            br = self.row_by_value('Bridge', 'name', self.name)
+        except RowNotFound:
+            if self.if_exists:
+                return
+            else:
+                msg = _LE("Bridge %s does not exist") % self.name
+                LOG.error(msg)
+                raise RuntimeError(msg)
+        self.api._ovs.verify('bridges')
+        for port in br.ports:
+            cmd = DelPortCommand(self.api, port.name, self.name,
+                                 if_exists=True)
+            cmd.run_idl(txn)
+        bridges = self.api._ovs.bridges
+        bridges.remove(br)
+        self.api._ovs.bridges = bridges
+        del self.api._tables['Bridge'].rows[br.uuid]
+
+
+class BridgeExistsCommand(BaseCommand):
+    def __init__(self, api, name):
+        super(BridgeExistsCommand, self).__init__(api)
+        self.name = name
+
+    def run_idl(self, txn):
+        self.result = bool(self.row_by_value('Bridge',
+                                             'name', self.name, None))
+
+
+class ListBridgesCommand(BaseCommand):
+    def __init__(self, api):
+        super(ListBridgesCommand, self).__init__(api)
+
+    def run_idl(self, txn):
+        # NOTE (twilson) [x.name for x in rows.values()] if no index
+        self.result = [x.name for x in
+                       self.api._tables['Bridge'].rows.values()]
+
+
+class BrGetExternalIdCommand(BaseCommand):
+    def __init__(self, api, name, field):
+        super(BrGetExternalIdCommand, self).__init__(api)
+        self.name = name
+        self.field = field
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.name)
+        self.result = br.external_ids[self.field]
+
+
+class BrSetExternalIdCommand(BaseCommand):
+    def __init__(self, api, name, field, value):
+        super(BrSetExternalIdCommand, self).__init__(api)
+        self.name = name
+        self.field = field
+        self.value = value
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.name)
+        external_ids = getattr(br, 'external_ids', {})
+        external_ids[self.field] = self.value
+        br.external_ids = external_ids
+
+
+class DbSetCommand(BaseCommand):
+    def __init__(self, api, table, record, *col_values):
+        super(DbSetCommand, self).__init__(api)
+        self.table = table
+        self.record = record
+        self.col_values = col_values
+
+    def run_idl(self, txn):
+        record = self.row_by_index(self.table, self.record)
+        for col, val in self.col_values:
+            # TODO(twilson) Ugh, the OVS library doesn't like OrderedDict
+            # We're only using it to make a unit test work, so we should fix
+            # this soon.
+            if isinstance(val, collections.OrderedDict):
+                val = dict(val)
+            setattr(record, col, val)
+
+
+class DbClearCommand(BaseCommand):
+    def __init__(self, api, table, record, column):
+        super(DbClearCommand, self).__init__(api)
+        self.table = table
+        self.record = record
+        self.column = column
+
+    def run_idl(self, txn):
+        record = self.row_by_index(self.table, self.record)
+        # Create an empty value of the column type
+        value = type(getattr(record, self.column))()
+        setattr(record, self.column, value)
+
+
+class DbGetCommand(BaseCommand):
+    def __init__(self, api, table, record, column):
+        super(DbGetCommand, self).__init__(api)
+        self.table = table
+        self.record = record
+        self.column = column
+
+    def run_idl(self, txn):
+        record = self.row_by_index(self.table, self.record)
+        # TODO(twilson) This feels wrong, but ovs-vsctl returns single results
+        # on set types without the list. The IDL is returning them as lists,
+        # even if the set has the maximum number of items set to 1. Might be
+        # able to inspect the Schema and just do this conversion for that case.
+        result = getattr(record, self.column)
+        if isinstance(result, list) and len(result) == 1:
+            self.result = result[0]
+        else:
+            self.result = result
+
+
+class SetControllerCommand(BaseCommand):
+    def __init__(self, api, bridge, targets):
+        super(SetControllerCommand, self).__init__(api)
+        self.bridge = bridge
+        self.targets = targets
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.bridge)
+        controllers = []
+        for target in self.targets:
+            controller = txn.insert(self.api._tables['Controller'])
+            controller.target = target
+            controllers.append(controller)
+        br.verify('controller')
+        br.controller = controllers
+
+
+class DelControllerCommand(BaseCommand):
+    def __init__(self, api, bridge):
+        super(DelControllerCommand, self).__init__(api)
+        self.bridge = bridge
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.bridge)
+        br.controller = []
+
+
+class GetControllerCommand(BaseCommand):
+    def __init__(self, api, bridge):
+        super(GetControllerCommand, self).__init__(api)
+        self.bridge = bridge
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.bridge)
+        br.verify('controller')
+        self.result = [c.target for c in br.controller]
+
+
+class SetFailModeCommand(BaseCommand):
+    def __init__(self, api, bridge, mode):
+        super(SetFailModeCommand, self).__init__(api)
+        self.bridge = bridge
+        self.mode = mode
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.bridge)
+        br.verify('fail_mode')
+        br.fail_mode = self.mode
+
+
+class AddPortCommand(BaseCommand):
+    def __init__(self, api, bridge, port, may_exist):
+        super(AddPortCommand, self).__init__(api)
+        self.bridge = bridge
+        self.port = port
+        self.may_exist = may_exist
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.bridge)
+        if self.may_exist:
+            port = self.row_by_value('Port', 'name', self.port, None)
+            if port:
+                return
+        port = txn.insert(self.api._tables['Port'])
+        port.name = self.port
+        br.verify('ports')
+        ports = getattr(br, 'ports', [])
+        ports.append(port)
+        br.ports = ports
+
+        iface = txn.insert(self.api._tables['Interface'])
+        iface.name = self.port
+        port.verify('interfaces')
+        ifaces = getattr(port, 'interfaces', [])
+        ifaces.append(iface)
+        port.interfaces = ifaces
+
+
+class DelPortCommand(BaseCommand):
+    def __init__(self, api, port, bridge, if_exists):
+        super(DelPortCommand, self).__init__(api)
+        self.port = port
+        self.bridge = bridge
+        self.if_exists = if_exists
+
+    def run_idl(self, txn):
+        try:
+            port = self.row_by_value('Port', 'name', self.port)
+        except RowNotFound:
+            if self.if_exists:
+                return
+            msg = _LE("Port %s does not exist") % self.port
+            raise RuntimeError(msg)
+        if self.bridge:
+            br = self.row_by_value('Bridge', 'name', self.bridge)
+        else:
+            br = next(b for b in self.api._tables['Bridge'].rows.values()
+                      if port in b.ports)
+
+        if port.uuid not in br.ports and not self.if_exists:
+            # TODO(twilson) Make real errors across both implementations
+            msg = _LE("Port %(port)s does not exist on %(bridge)s!") % {
+                'port': self.name, 'bridge': self.bridge
+            }
+            LOG.error(msg)
+            raise RuntimeError(msg)
+
+        br.verify('ports')
+        ports = br.ports
+        ports.remove(port)
+        br.ports = ports
+
+        # Also remove port/interface directly for indexing?
+        port.verify('interfaces')
+        for iface in port.interfaces:
+            del self.api._tables['Interface'].rows[iface.uuid]
+        del self.api._tables['Port'].rows[port.uuid]
+
+
+class ListPortsCommand(BaseCommand):
+    def __init__(self, api, bridge):
+        super(ListPortsCommand, self).__init__(api)
+        self.bridge = bridge
+
+    def run_idl(self, txn):
+        br = self.row_by_value('Bridge', 'name', self.bridge)
+        self.result = [p.name for p in br.ports if p.name != self.bridge]
+
+
+class PortToBridgeCommand(BaseCommand):
+    def __init__(self, api, name):
+        super(PortToBridgeCommand, self).__init__(api)
+        self.name = name
+
+    def run_idl(self, txn):
+        # TODO(twilson) This is expensive!
+        # This traversal of all ports could be eliminated by caching the bridge
+        # name on the Port's (or Interface's for iface_to_br) external_id field
+        # In fact, if we did that, the only place that uses to_br functions
+        # could just add the external_id field to the conditions passed to find
+        port = self.row_by_value('Port', 'name', self.name)
+        bridges = self.api._tables['Bridge'].rows.values()
+        self.result = next(br.name for br in bridges if port in br.ports)
+
+
+class DbListCommand(BaseCommand):
+    def __init__(self, api, table, records, columns, if_exists):
+        super(DbListCommand, self).__init__(api)
+        self.table = self.api._tables[table]
+        self.columns = columns or self.table.columns.keys() + ['_uuid']
+        self.if_exists = if_exists
+        idx = idlutils.get_index_column(self.table)
+        if records:
+            self.records = [uuid for uuid, row in self.table.rows.items()
+                            if getattr(row, idx) in records]
+        else:
+            self.records = self.table.rows.keys()
+
+    def run_idl(self, txn):
+        self.result = [
+            {
+                c: idlutils.get_column_value(self.table.rows[uuid], c)
+                for c in self.columns
+            }
+            for uuid in self.records
+        ]
+
+
+class DbFindCommand(BaseCommand):
+    def __init__(self, api, table, *conditions, **kwargs):
+        super(DbFindCommand, self).__init__(api)
+        self.table = self.api._tables[table]
+        self.conditions = conditions
+        self.columns = (kwargs.get('columns') or
+                        self.table.columns.keys() + ['_uuid'])
+
+    def run_idl(self, txn):
+        self.result = [
+            {
+                c: idlutils.get_column_value(r, c)
+                for c in self.columns
+            }
+            for r in self.table.rows.values()
+            if idlutils.row_match(r, self.conditions)
+        ]
diff --git a/neutron/agent/ovsdb/native/connection.py b/neutron/agent/ovsdb/native/connection.py
new file mode 100644 (file)
index 0000000..25ea55e
--- /dev/null
@@ -0,0 +1,87 @@
+# Copyright (c) 2015 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import Queue
+import threading
+
+from ovs.db import idl
+from ovs import poller
+
+from neutron.agent.ovsdb.native import idlutils
+
+
+class TransactionQueue(Queue.Queue, object):
+    def __init__(self, *args, **kwargs):
+        super(TransactionQueue, self).__init__(*args, **kwargs)
+        alertpipe = os.pipe()
+        self.alertin = os.fdopen(alertpipe[0], 'r', 0)
+        self.alertout = os.fdopen(alertpipe[1], 'w', 0)
+
+    def get_nowait(self, *args, **kwargs):
+        try:
+            result = super(TransactionQueue, self).get_nowait(*args, **kwargs)
+        except Queue.Empty:
+            return None
+        self.alertin.read(1)
+        return result
+
+    def put(self, *args, **kwargs):
+        super(TransactionQueue, self).put(*args, **kwargs)
+        self.alertout.write('X')
+        self.alertout.flush()
+
+    @property
+    def alert_fileno(self):
+        return self.alertin.fileno()
+
+
+class Connection(object):
+    def __init__(self, connection, timeout):
+        self.idl = None
+        self.connection = connection
+        self.timeout = timeout
+        self.txns = TransactionQueue(1)
+        self.lock = threading.Lock()
+
+    def start(self):
+        with self.lock:
+            if self.idl is not None:
+                return
+
+            helper = idlutils.get_schema_helper(self.connection)
+            helper.register_all()
+            self.idl = idl.Idl(self.connection, helper)
+            idlutils.wait_for_change(self.idl, self.timeout)
+            self.poller = poller.Poller()
+            self.thread = threading.Thread(target=self.run)
+            self.thread.setDaemon(True)
+            self.thread.start()
+
+    def run(self):
+        while True:
+            self.idl.wait(self.poller)
+            self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
+            self.poller.block()
+            self.idl.run()
+            txn = self.txns.get_nowait()
+            if txn is not None:
+                try:
+                    txn.results.put(txn.do_commit())
+                except Exception as ex:
+                    txn.results.put(ex)
+                self.txns.task_done()
+
+    def queue_txn(self, txn):
+        self.txns.put(txn)
diff --git a/neutron/agent/ovsdb/native/idlutils.py b/neutron/agent/ovsdb/native/idlutils.py
new file mode 100644 (file)
index 0000000..89f3224
--- /dev/null
@@ -0,0 +1,118 @@
+# Copyright (c) 2015 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import time
+
+from ovs.db import idl
+from ovs import jsonrpc
+from ovs import poller
+from ovs import stream
+
+
+def get_schema_helper(connection):
+    err, strm = stream.Stream.open_block(
+        stream.Stream.open(connection))
+    if err:
+        raise Exception("Could not connect to %s" % (
+            connection,))
+    rpc = jsonrpc.Connection(strm)
+    req = jsonrpc.Message.create_request('get_schema', ['Open_vSwitch'])
+    err, resp = rpc.transact_block(req)
+    rpc.close()
+    if err:
+        raise Exception("Could not retrieve schema from %s: %s" % (
+            connection, os.strerror(err)))
+    elif resp.error:
+        raise Exception(resp.error)
+    return idl.SchemaHelper(None, resp.result)
+
+
+def wait_for_change(_idl, timeout, seqno=None):
+    if seqno is None:
+        seqno = _idl.change_seqno
+    stop = time.time() + timeout
+    while _idl.change_seqno == seqno:
+        _idl.run()
+        ovs_poller = poller.Poller()
+        _idl.wait(ovs_poller)
+        ovs_poller.timer_wait(timeout * 1000)
+        ovs_poller.block()
+        if time.time() > stop:
+            raise Exception("Timeout")
+
+
+def get_column_value(row, col):
+    if col == '_uuid':
+        val = row.uuid
+    else:
+        val = getattr(row, col)
+
+    # Idl returns lists of Rows where ovs-vsctl returns lists of UUIDs
+    if isinstance(val, list) and len(val):
+        if isinstance(val[0], idl.Row):
+            val = [v.uuid for v in val]
+        # ovs-vsctl treats lists of 1 as single results
+        if len(val) == 1:
+            val = val[0]
+    return val
+
+
+def condition_match(row, condition):
+    """Return whether a condition matches a row
+
+    :param row       An OVSDB Row
+    :param condition A 3-tuple containing (column, operation, match)
+    """
+
+    col, op, match = condition
+    val = get_column_value(row, col)
+    matched = True
+
+    # TODO(twilson) Implement other operators and type comparisons
+    # ovs_lib only uses dict '=' and '!=' searches for now
+    if isinstance(match, dict):
+        for key in match:
+            if op == '=':
+                if (key not in val or match[key] != val[key]):
+                    matched = False
+                    break
+            elif op == '!=':
+                if key not in val or match[key] == val[key]:
+                    matched = False
+                    break
+            else:
+                raise NotImplementedError()
+    elif isinstance(match, list):
+        raise NotImplementedError()
+    else:
+        if op == '==' and val != match:
+            matched = False
+        elif op == '!=' and val == match:
+            matched = False
+        else:
+            raise NotImplementedError()
+    return matched
+
+
+def row_match(row, conditions):
+    """Return whether the row matches the list of conditions"""
+    return all(condition_match(row, cond) for cond in conditions)
+
+
+def get_index_column(table):
+    if len(table.indexes) == 1:
+        idx = table.indexes[0]
+        if len(idx) == 1:
+            return idx[0].name
index 49c2a561c21d905c91be09386e8134e6560e6630..68a0051b46e35b0da6ae3edd64b3e20e7f3fb86c 100644 (file)
@@ -129,6 +129,7 @@ class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
 class BaseOVSLinuxTestCase(testscenarios.WithScenarios, BaseLinuxTestCase):
     scenarios = [
         ('vsctl', dict(ovsdb_interface='vsctl')),
+        ('native', dict(ovsdb_interface='native')),
     ]
 
     def setUp(self):
index 3ed3e6313fa7f09044708e3ae675bbdd6ae5e720..eba1361ffb7351092c95e474ce3580dd53558f89 100644 (file)
@@ -54,6 +54,16 @@ class OVSBridgeTestCase(base.BaseOVSLinuxTestCase):
         self.br.delete_port(port_name)
         self.assertFalse(self.br.port_exists(port_name))
 
+    def test_duplicate_port_may_exist_false(self):
+        port_name, ofport = self.create_ovs_port(('type', 'internal'))
+        cmd = self.br.ovsdb.add_port(self.br.br_name,
+                                     port_name, may_exist=False)
+        self.assertRaises(RuntimeError, cmd.execute, check_error=True)
+
+    def test_delete_port_if_exists_false(self):
+        cmd = self.br.ovsdb.del_port('nonexistantport', if_exists=False)
+        self.assertRaises(RuntimeError, cmd.execute, check_error=True)
+
     def test_replace_port(self):
         port_name = base.get_rand_port_name()
         self.br.replace_port(port_name, ('type', 'internal'))
index 2376fdbd509cb848005c870c62619ae40f85d028..3924dbfcbaf51245c95cde6d8bdea81d63886a6e 100644 (file)
@@ -146,22 +146,6 @@ class OVS_Lib_Test(base.BaseTestCase):
     def _build_timeout_opt(self, exp_timeout):
         return "--timeout=%d" % exp_timeout if exp_timeout else self.TO
 
-    def test_replace_port(self):
-        pname = "tap5"
-        self.br.replace_port(pname)
-        self._verify_vsctl_mock("--if-exists", "del-port", pname,
-                                "--", "add-port", self.BR_NAME, pname)
-
-    def test_replace_port_with_attrs(self):
-        pname = "tap5"
-        self.br.replace_port(pname, ('type', 'internal'),
-                             ('external_ids:iface-status', 'active'))
-        self._verify_vsctl_mock("--if-exists", "del-port", pname,
-                                "--", "add-port", self.BR_NAME, pname,
-                                "--", "set", "Interface", pname,
-                                "type=internal",
-                                "external_ids:iface-status=active")
-
     def _test_delete_port(self, exp_timeout=None):
         pname = "tap5"
         self.br.delete_port(pname)